package cn.ponfee.disjob.supervisor.service;

import cn.ponfee.disjob.common.base.IdGenerator;
import cn.ponfee.disjob.common.base.LazyLoader;
import cn.ponfee.disjob.common.collect.Collects;
import cn.ponfee.disjob.common.dag.DAGEdge;
import cn.ponfee.disjob.common.dag.DAGNode;
import cn.ponfee.disjob.common.spring.TransactionUtils;
import cn.ponfee.disjob.common.tuple.Tuple2;
import cn.ponfee.disjob.common.tuple.Tuple3;
import cn.ponfee.disjob.common.util.Functions;
import cn.ponfee.disjob.common.util.Jsons;
import cn.ponfee.disjob.common.util.Strings;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.enums.ExecuteState;
import cn.ponfee.disjob.core.enums.JobState;
import cn.ponfee.disjob.core.enums.Operations;
import cn.ponfee.disjob.core.enums.RetryType;
import cn.ponfee.disjob.core.enums.RouteStrategy;
import cn.ponfee.disjob.core.enums.RunState;
import cn.ponfee.disjob.core.enums.RunType;
import cn.ponfee.disjob.core.exception.JobException;
import cn.ponfee.disjob.core.model.InstanceAttach;
import cn.ponfee.disjob.core.model.SchedDepend;
import cn.ponfee.disjob.core.model.SchedInstance;
import cn.ponfee.disjob.core.model.SchedJob;
import cn.ponfee.disjob.core.model.SchedTask;
import cn.ponfee.disjob.core.model.SchedWorkflow;
import cn.ponfee.disjob.core.param.ExecuteTaskParam;
import cn.ponfee.disjob.core.param.JobHandlerParam;
import cn.ponfee.disjob.core.param.StartTaskParam;
import cn.ponfee.disjob.core.param.TaskWorkerParam;
import cn.ponfee.disjob.core.param.TerminateTaskParam;
import cn.ponfee.disjob.dispatch.TaskDispatcher;
import cn.ponfee.disjob.registry.SupervisorRegistry;
import cn.ponfee.disjob.supervisor.base.WorkerCoreRpcClient;
import cn.ponfee.disjob.supervisor.dag.WorkflowGraph;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedDependMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedInstanceMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedJobMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedTaskMapper;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedWorkflowMapper;
import cn.ponfee.disjob.supervisor.instance.NormalInstanceCreator;
import cn.ponfee.disjob.supervisor.instance.TriggerInstance;
import cn.ponfee.disjob.supervisor.instance.TriggerInstanceCreator;
import cn.ponfee.disjob.supervisor.instance.WorkflowInstanceCreator;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

@Component
/* loaded from: input_file:cn/ponfee/disjob/supervisor/service/DistributedJobManager.class */
public class DistributedJobManager extends AbstractJobManager {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedJobManager.class);
    private static final Interner<Long> INTERNER_POOL = Interners.newWeakInterner();
    private static final List<Integer> RUN_STATE_TERMINABLE = Collects.convert(RunState.TERMINABLE_LIST, (v0) -> {
        return v0.value();
    });
    private static final List<Integer> RUN_STATE_RUNNABLE = Collects.convert(RunState.RUNNABLE_LIST, (v0) -> {
        return v0.value();
    });
    private static final List<Integer> RUN_STATE_PAUSABLE = Collects.convert(RunState.PAUSABLE_LIST, (v0) -> {
        return v0.value();
    });
    private static final List<Integer> RUN_STATE_WAITING = Collections.singletonList(Integer.valueOf(RunState.WAITING.value()));
    private static final List<Integer> RUN_STATE_RUNNING = Collections.singletonList(Integer.valueOf(RunState.RUNNING.value()));
    private static final List<Integer> RUN_STATE_PAUSED = Collections.singletonList(Integer.valueOf(RunState.PAUSED.value()));
    private static final List<Integer> EXECUTE_STATE_EXECUTABLE = Collects.convert(ExecuteState.EXECUTABLE_LIST, (v0) -> {
        return v0.value();
    });
    private static final List<Integer> EXECUTE_STATE_PAUSABLE = Collects.convert(ExecuteState.PAUSABLE_LIST, (v0) -> {
        return v0.value();
    });
    private static final List<Integer> EXECUTE_STATE_WAITING = Collections.singletonList(Integer.valueOf(ExecuteState.WAITING.value()));
    private static final List<Integer> EXECUTE_STATE_PAUSED = Collections.singletonList(Integer.valueOf(ExecuteState.PAUSED.value()));
    private final TransactionTemplate transactionTemplate;
    private final SchedInstanceMapper instanceMapper;
    private final SchedTaskMapper taskMapper;
    private final SchedWorkflowMapper workflowMapper;

    public DistributedJobManager(SchedJobMapper schedJobMapper, SchedDependMapper schedDependMapper, SchedInstanceMapper schedInstanceMapper, SchedTaskMapper schedTaskMapper, SchedWorkflowMapper schedWorkflowMapper, IdGenerator idGenerator, SupervisorRegistry supervisorRegistry, TaskDispatcher taskDispatcher, WorkerCoreRpcClient workerCoreRpcClient, @Qualifier("disjobTransactionTemplate") TransactionTemplate transactionTemplate) {
        super(schedJobMapper, schedDependMapper, idGenerator, supervisorRegistry, taskDispatcher, workerCoreRpcClient);
        this.transactionTemplate = transactionTemplate;
        this.instanceMapper = schedInstanceMapper;
        this.taskMapper = schedTaskMapper;
        this.workflowMapper = schedWorkflowMapper;
    }

    public boolean renewInstanceUpdateTime(SchedInstance schedInstance, Date date) {
        return isOneAffectedRow(this.instanceMapper.renewUpdateTime(schedInstance.getInstanceId().longValue(), date, schedInstance.getVersion().intValue()));
    }

    @Override // cn.ponfee.disjob.supervisor.service.AbstractJobManager
    protected boolean cancelWaitingTask(long j) {
        return isOneAffectedRow(this.taskMapper.terminate(j, null, ExecuteState.BROADCAST_ABORTED.value(), ExecuteState.WAITING.value(), null, null));
    }

    public void savepoint(long j, String str) {
        assertOneAffectedRow(this.taskMapper.savepoint(j, str), () -> {
            return "Save point failed: " + j + " | " + str;
        });
    }

    @Transactional(transactionManager = "disjobTransactionManager", rollbackFor = {Exception.class})
    public void triggerJob(long j) throws JobException {
        SchedJob schedJob = this.jobMapper.get(j);
        Assert.notNull(schedJob, () -> {
            return "Sched job not found: " + j;
        });
        TriggerInstanceCreator<?> of = TriggerInstanceCreator.of(schedJob.getJobType(), this);
        TriggerInstance create = of.create(schedJob, RunType.MANUAL, System.currentTimeMillis());
        createInstance(create);
        TransactionUtils.doAfterTransactionCommit(() -> {
            of.dispatch(schedJob, create);
        });
    }

    @Transactional(transactionManager = "disjobTransactionManager", rollbackFor = {Exception.class})
    public boolean createInstance(SchedJob schedJob, TriggerInstance triggerInstance) {
        if (this.jobMapper.updateNextTriggerTime(schedJob) == 0) {
            return false;
        }
        createInstance(triggerInstance);
        return true;
    }

    @Transactional(transactionManager = "disjobTransactionManager", rollbackFor = {Exception.class})
    public void updateTaskWorker(List<TaskWorkerParam> list) {
        if (CollectionUtils.isNotEmpty(list)) {
            list.sort(Comparator.comparing((v0) -> {
                return v0.getTaskId();
            }));
            SchedTaskMapper schedTaskMapper = this.taskMapper;
            schedTaskMapper.getClass();
            Collects.batchProcess(list, schedTaskMapper::batchUpdateWorker, 200);
        }
    }

    @Transactional(transactionManager = "disjobTransactionManager", rollbackFor = {Exception.class})
    public boolean startTask(StartTaskParam startTaskParam) {
        SchedInstance schedInstance = this.instanceMapper.get(startTaskParam.getInstanceId());
        Assert.notNull(schedInstance, () -> {
            return "Sched instance not found: " + startTaskParam;
        });
        if (!RUN_STATE_PAUSABLE.contains(schedInstance.getRunState())) {
            return false;
        }
        Date date = new Date();
        int i = 0;
        if (RunState.WAITING.equals(schedInstance.getRunState())) {
            i = this.instanceMapper.start(startTaskParam.getInstanceId(), date);
        }
        if (this.taskMapper.start(startTaskParam.getTaskId(), startTaskParam.getWorker(), date) != 0) {
            return true;
        }
        Assert.state(i == 0, () -> {
            return "Start task failed: " + startTaskParam;
        });
        return false;
    }

    public void changeInstanceState(long j, ExecuteState executeState) {
        RunState runState = executeState.runState();
        Assert.isTrue(executeState != ExecuteState.EXECUTING, "Cannot force update state to EXECUTING");
        doTransactionLockInSynchronized(j, (Long) null, schedInstance -> {
            Assert.notNull(schedInstance, () -> {
                return "Sched instance not found: " + j;
            });
            Assert.isTrue(!schedInstance.isWorkflow(), () -> {
                return "Unsupported force change workflow instance state: " + j;
            });
            int changeState = this.instanceMapper.changeState(j, runState.value());
            int changeState2 = this.taskMapper.changeState(j, executeState.value());
            if (changeState == 0 && changeState2 == 0) {
                throw new IllegalStateException("Force update instance state failed: " + j);
            }
            if (executeState == ExecuteState.WAITING) {
                Tuple3<SchedJob, SchedInstance, List<SchedTask>> buildDispatchParams = buildDispatchParams(j, changeState2);
                TransactionUtils.doAfterTransactionCommit(() -> {
                    super.dispatch((SchedJob) buildDispatchParams.a, (SchedInstance) buildDispatchParams.b, (List) buildDispatchParams.c);
                });
            }
            LOG.info("Force change state success {} | {}", Long.valueOf(j), executeState);
        });
    }

    public void deleteInstance(long j) {
        doTransactionLockInSynchronized(j, this.instanceMapper.getWnstanceId(j), schedInstance -> {
            Assert.notNull(schedInstance, () -> {
                return "Sched instance not found: " + j;
            });
            Assert.isTrue(RunState.of(schedInstance.getRunState()).isTerminal(), () -> {
                return "Deleting instance must be terminal: " + schedInstance;
            });
            if (schedInstance.isWorkflow()) {
                Assert.isTrue(schedInstance.isWorkflowLead(), () -> {
                    return "Cannot delete workflow node instance: " + j;
                });
                assertOneAffectedRow(this.instanceMapper.deleteByInstanceId(j), () -> {
                    return "Delete workflow lead instance conflict: " + j;
                });
                Iterator<SchedInstance> it = this.instanceMapper.findWorkflowNode(schedInstance.getWnstanceId().longValue()).iterator();
                while (it.hasNext()) {
                    assertManyAffectedRow(this.taskMapper.deleteByInstanceId(it.next().getInstanceId().longValue()), () -> {
                        return "Delete sched task conflict: " + j;
                    });
                }
                assertManyAffectedRow(this.instanceMapper.deleteByWnstanceId(j), () -> {
                    return "Delete workflow node instance conflict: " + j;
                });
                assertManyAffectedRow(this.workflowMapper.deleteByWnstanceId(j), () -> {
                    return "Delete sched workflow conflict: " + j;
                });
            } else {
                assertOneAffectedRow(this.instanceMapper.deleteByInstanceId(j), () -> {
                    return "Delete sched instance conflict: " + j;
                });
                assertManyAffectedRow(this.taskMapper.deleteByInstanceId(j), () -> {
                    return "Delete sched task conflict: " + j;
                });
            }
            LOG.info("Delete sched instance success {}", Long.valueOf(j));
        });
    }

    public boolean terminateTask(TerminateTaskParam terminateTaskParam) {
        Assert.hasText(terminateTaskParam.getWorker(), "Terminate task worker cannot be blank.");
        ExecuteState toState = terminateTaskParam.getToState();
        long instanceId = terminateTaskParam.getInstanceId();
        Assert.isTrue(!ExecuteState.PAUSABLE_LIST.contains(toState), () -> {
            return "Stop executing invalid to state " + toState;
        });
        return doTransactionLockInSynchronized(instanceId, terminateTaskParam.getWnstanceId(), schedInstance -> {
            Assert.notNull(schedInstance, () -> {
                return "Terminate executing task failed, instance not found: " + instanceId;
            });
            Assert.isTrue(!schedInstance.isWorkflowLead(), () -> {
                return "Cannot direct terminate workflow lead instance: " + schedInstance;
            });
            if (RunState.of(schedInstance.getRunState()).isTerminal()) {
                return false;
            }
            if (!isOneAffectedRow(this.taskMapper.terminate(terminateTaskParam.getTaskId(), terminateTaskParam.getWorker(), toState.value(), ExecuteState.EXECUTING.value(), toState.isTerminal() ? new Date() : null, terminateTaskParam.getErrorMsg()))) {
                LOG.warn("Conflict terminate executing task: {} | {}", Long.valueOf(terminateTaskParam.getTaskId()), toState);
                return false;
            }
            Tuple2<RunState, Date> obtainRunState = obtainRunState(this.taskMapper.findBaseByInstanceId(instanceId));
            if (obtainRunState != null && this.instanceMapper.terminate(instanceId, ((RunState) obtainRunState.a).value(), RUN_STATE_TERMINABLE, (Date) obtainRunState.b) > 0) {
                if (terminateTaskParam.getOperation().isTrigger()) {
                    schedInstance.markTerminated((RunState) obtainRunState.a, (Date) obtainRunState.b);
                    afterTerminateTask(schedInstance);
                } else if (schedInstance.isWorkflowNode()) {
                    updateWorkflowEdgeState(schedInstance, Integer.valueOf(((RunState) obtainRunState.a).value()), RUN_STATE_TERMINABLE);
                    updateWorkflowLeadState(this.instanceMapper.get(terminateTaskParam.getWnstanceId().longValue()));
                }
            }
            return true;
        });
    }

    public boolean purgeInstance(SchedInstance schedInstance) {
        Long instanceId = schedInstance.getInstanceId();
        return doTransactionLockInSynchronized(instanceId.longValue(), schedInstance.getWnstanceId(), schedInstance2 -> {
            Assert.notNull(schedInstance2, () -> {
                return "Purge instance not found: " + instanceId;
            });
            Assert.isTrue(!schedInstance2.isWorkflowLead(), () -> {
                return "Cannot purge workflow lead instance: " + schedInstance2;
            });
            if (!RUN_STATE_PAUSABLE.contains(schedInstance2.getRunState())) {
                return false;
            }
            List<SchedTask> findBaseByInstanceId = this.taskMapper.findBaseByInstanceId(instanceId.longValue());
            if (findBaseByInstanceId.stream().anyMatch(schedTask -> {
                return ExecuteState.WAITING.equals(schedTask.getExecuteState());
            })) {
                LOG.warn("Purge instance failed, has waiting task: {}", findBaseByInstanceId);
                return false;
            }
            if (hasAliveExecuting(findBaseByInstanceId)) {
                LOG.warn("Purge instance failed, has alive executing task: {}", findBaseByInstanceId);
                return false;
            }
            Tuple2<RunState, Date> obtainRunState = obtainRunState(findBaseByInstanceId);
            if (obtainRunState == null) {
                obtainRunState = Tuple2.of(RunState.CANCELED, new Date());
            } else {
                Assert.isTrue(((RunState) obtainRunState.a).isTerminal(), () -> {
                    return "Purge instance state must be terminal state: " + schedInstance2;
                });
            }
            if (!isOneAffectedRow(this.instanceMapper.terminate(instanceId.longValue(), ((RunState) obtainRunState.a).value(), RUN_STATE_TERMINABLE, (Date) obtainRunState.b))) {
                return false;
            }
            findBaseByInstanceId.stream().filter(schedTask2 -> {
                return EXECUTE_STATE_PAUSABLE.contains(schedTask2.getExecuteState());
            }).forEach(schedTask3 -> {
                this.taskMapper.terminate(schedTask3.getTaskId().longValue(), ExecuteState.EXECUTING.equals(schedTask3.getExecuteState()) ? Strings.requireNonBlank(schedTask3.getWorker()) : null, ExecuteState.EXECUTE_TIMEOUT.value(), schedTask3.getExecuteState().intValue(), new Date(), null);
            });
            schedInstance2.markTerminated((RunState) obtainRunState.a, (Date) obtainRunState.b);
            afterTerminateTask(schedInstance2);
            LOG.warn("Purge instance {} to state {}", instanceId, obtainRunState.a);
            return true;
        });
    }

    public boolean pauseInstance(long j) {
        Long wnstanceId = this.instanceMapper.getWnstanceId(j);
        if (wnstanceId != null) {
            Assert.isTrue(j == wnstanceId.longValue(), () -> {
                return "Must pause lead workflow instance: " + j;
            });
        }
        return doTransactionLockInSynchronized(j, wnstanceId, schedInstance -> {
            Assert.notNull(schedInstance, () -> {
                return "Pause instance not found: " + j;
            });
            if (!RUN_STATE_PAUSABLE.contains(schedInstance.getRunState())) {
                return false;
            }
            if (schedInstance.isWorkflow()) {
                Assert.isTrue(schedInstance.isWorkflowLead(), () -> {
                    return "Cannot pause workflow node instance: " + j;
                });
                this.workflowMapper.update(j, null, Integer.valueOf(RunState.PAUSED.value()), null, RUN_STATE_WAITING, null);
                this.instanceMapper.findWorkflowNode(j).stream().filter(schedInstance -> {
                    return RUN_STATE_PAUSABLE.contains(schedInstance.getRunState());
                }).forEach(this::pauseInstance);
                updateWorkflowLeadState(schedInstance);
            } else {
                pauseInstance(schedInstance);
            }
            return true;
        });
    }

    public boolean cancelInstance(long j, Operations operations) {
        Assert.isTrue(operations.toState().isFailure(), () -> {
            return "Cancel instance operation invalid: " + operations;
        });
        Long wnstanceId = this.instanceMapper.getWnstanceId(j);
        if (wnstanceId != null) {
            Assert.isTrue(j == wnstanceId.longValue(), () -> {
                return "Must pause lead workflow instance: " + j;
            });
        }
        return doTransactionLockInSynchronized(j, wnstanceId, schedInstance -> {
            Assert.notNull(schedInstance, () -> {
                return "Cancel instance not found: " + j;
            });
            if (RunState.of(schedInstance.getRunState()).isTerminal()) {
                return false;
            }
            if (schedInstance.isWorkflow()) {
                Assert.isTrue(schedInstance.isWorkflowLead(), () -> {
                    return "Cannot cancel workflow node instance: " + j;
                });
                this.workflowMapper.update(j, null, Integer.valueOf(RunState.CANCELED.value()), null, RUN_STATE_WAITING, null);
                this.instanceMapper.findWorkflowNode(j).stream().filter(schedInstance -> {
                    return !RunState.of(schedInstance.getRunState()).isTerminal();
                }).forEach(schedInstance2 -> {
                    cancelInstance(schedInstance2, operations);
                });
                updateWorkflowLeadState(schedInstance);
            } else {
                cancelInstance(schedInstance, operations);
            }
            return true;
        });
    }

    public boolean resumeInstance(long j) {
        Long wnstanceId = this.instanceMapper.getWnstanceId(j);
        return doTransactionLockInSynchronized(j, wnstanceId, schedInstance -> {
            Assert.notNull(schedInstance, () -> {
                return "Cancel failed, instance_id not found: " + j;
            });
            if (!RunState.PAUSED.equals(schedInstance.getRunState())) {
                return false;
            }
            if (schedInstance.isWorkflow()) {
                Assert.isTrue(schedInstance.isWorkflowLead(), () -> {
                    return "Cannot resume workflow node instance: " + j;
                });
                assertOneAffectedRow(this.instanceMapper.updateState(j, RunState.RUNNING.value(), RunState.PAUSED.value()), () -> {
                    return "Resume workflow lead instance failed: " + j;
                });
                this.workflowMapper.resumeWaiting(j);
                for (SchedInstance schedInstance : this.instanceMapper.findWorkflowNode(j)) {
                    if (RunState.PAUSED.equals(schedInstance.getRunState())) {
                        resumeInstance(schedInstance);
                        updateWorkflowEdgeState(schedInstance, Integer.valueOf(RunState.RUNNING.value()), RUN_STATE_PAUSED);
                    }
                }
                WorkflowGraph workflowGraph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(wnstanceId.longValue()));
                createWorkflowNode(schedInstance, workflowGraph, workflowGraph.map(), ExceptionUtils::rethrow);
            } else {
                resumeInstance(schedInstance);
            }
            return true;
        });
    }

    private void doTransactionLockInSynchronized(long j, Long l, Consumer<SchedInstance> consumer) {
        doTransactionLockInSynchronized(j, l, Functions.convert(consumer, Boolean.TRUE));
    }

    private boolean doTransactionLockInSynchronized(long j, Long l, Function<SchedInstance, Boolean> function) {
        boolean equals;
        Long valueOf = Long.valueOf(l == null ? j : l.longValue());
        synchronized (((Long) INTERNER_POOL.intern(valueOf))) {
            equals = Boolean.TRUE.equals((Boolean) this.transactionTemplate.execute(transactionStatus -> {
                SchedInstance lock = this.instanceMapper.lock(valueOf.longValue());
                Assert.notNull(lock, () -> {
                    return "Lock instance not found: " + valueOf;
                });
                SchedInstance schedInstance = j == valueOf.longValue() ? lock : this.instanceMapper.get(j);
                Assert.notNull(schedInstance, () -> {
                    return "Instance not found: " + schedInstance;
                });
                if (Objects.equals(schedInstance.getWnstanceId(), l)) {
                    return (Boolean) function.apply(schedInstance);
                }
                throw new IllegalArgumentException("Invalid workflow instance id: " + l + ", " + schedInstance);
            }));
        }
        return equals;
    }

    private Tuple2<RunState, Date> obtainRunState(List<SchedTask> list) {
        List list2 = (List) list.stream().map((v0) -> {
            return v0.getExecuteState();
        }).map(ExecuteState::of).collect(Collectors.toList());
        if (list2.stream().allMatch((v0) -> {
            return v0.isTerminal();
        })) {
            return Tuple2.of(list2.stream().anyMatch((v0) -> {
                return v0.isFailure();
            }) ? RunState.CANCELED : RunState.FINISHED, list.stream().map((v0) -> {
                return v0.getExecuteEndTime();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).max(Comparator.naturalOrder()).orElseGet(Date::new));
        }
        Stream stream = list2.stream();
        List list3 = ExecuteState.PAUSABLE_LIST;
        list3.getClass();
        if (stream.anyMatch((v1) -> {
            return r1.contains(v1);
        })) {
            return null;
        }
        return Tuple2.of(RunState.PAUSED, (Object) null);
    }

    private void createInstance(TriggerInstance triggerInstance) {
        this.instanceMapper.insert(triggerInstance.getInstance());
        if (triggerInstance instanceof NormalInstanceCreator.NormalInstance) {
            List<SchedTask> tasks = ((NormalInstanceCreator.NormalInstance) triggerInstance).getTasks();
            SchedTaskMapper schedTaskMapper = this.taskMapper;
            schedTaskMapper.getClass();
            Collects.batchProcess(tasks, schedTaskMapper::batchInsert, 200);
            return;
        }
        if (!(triggerInstance instanceof WorkflowInstanceCreator.WorkflowInstance)) {
            throw new UnsupportedOperationException("Unknown instance creator type: " + triggerInstance.getClass());
        }
        WorkflowInstanceCreator.WorkflowInstance workflowInstance = (WorkflowInstanceCreator.WorkflowInstance) triggerInstance;
        List<SchedWorkflow> workflows = workflowInstance.getWorkflows();
        SchedWorkflowMapper schedWorkflowMapper = this.workflowMapper;
        schedWorkflowMapper.getClass();
        Collects.batchProcess(workflows, schedWorkflowMapper::batchInsert, 200);
        for (Tuple2<SchedInstance, List<SchedTask>> tuple2 : workflowInstance.getNodeInstances()) {
            this.instanceMapper.insert((SchedInstance) tuple2.a);
            List list = (List) tuple2.b;
            SchedTaskMapper schedTaskMapper2 = this.taskMapper;
            schedTaskMapper2.getClass();
            Collects.batchProcess(list, schedTaskMapper2::batchInsert, 200);
        }
    }

    private void pauseInstance(SchedInstance schedInstance) {
        Assert.isTrue(RUN_STATE_PAUSABLE.contains(schedInstance.getRunState()), () -> {
            return "Invalid pause instance state: " + schedInstance;
        });
        long longValue = schedInstance.getInstanceId().longValue();
        Operations operations = Operations.PAUSE;
        this.taskMapper.updateStateByInstanceId(longValue, operations.toState().value(), EXECUTE_STATE_WAITING, null);
        List<ExecuteTaskParam> loadExecutingTasks = loadExecutingTasks(schedInstance, operations);
        if (!loadExecutingTasks.isEmpty()) {
            TransactionUtils.doAfterTransactionCommit(() -> {
                super.dispatch(loadExecutingTasks);
            });
            return;
        }
        Tuple2<RunState, Date> obtainRunState = obtainRunState(this.taskMapper.findBaseByInstanceId(longValue));
        Assert.notNull(obtainRunState, () -> {
            return "Pause instance failed: " + longValue;
        });
        assertOneAffectedRow(this.instanceMapper.terminate(longValue, ((RunState) obtainRunState.a).value(), RUN_STATE_PAUSABLE, (Date) obtainRunState.b), () -> {
            return "Pause instance failed: " + schedInstance + " | " + obtainRunState.a;
        });
        if (schedInstance.isWorkflowNode()) {
            updateWorkflowEdgeState(schedInstance, Integer.valueOf(((RunState) obtainRunState.a).value()), RUN_STATE_PAUSABLE);
        }
    }

    private void cancelInstance(SchedInstance schedInstance, Operations operations) {
        long longValue = schedInstance.getInstanceId().longValue();
        this.taskMapper.updateStateByInstanceId(longValue, operations.toState().value(), EXECUTE_STATE_EXECUTABLE, new Date());
        List<ExecuteTaskParam> loadExecutingTasks = loadExecutingTasks(schedInstance, operations);
        if (!loadExecutingTasks.isEmpty()) {
            TransactionUtils.doAfterTransactionCommit(() -> {
                super.dispatch(loadExecutingTasks);
            });
            return;
        }
        Tuple2<RunState, Date> obtainRunState = obtainRunState(this.taskMapper.findBaseByInstanceId(longValue));
        Assert.notNull(obtainRunState, () -> {
            return "Cancel instance failed: " + longValue;
        });
        if (obtainRunState.a == RunState.PAUSED) {
            obtainRunState = Tuple2.of(RunState.CANCELED, new Date());
        }
        RunState runState = (RunState) obtainRunState.a;
        assertOneAffectedRow(this.instanceMapper.terminate(longValue, runState.value(), RUN_STATE_TERMINABLE, (Date) obtainRunState.b), () -> {
            return "Cancel instance failed: " + schedInstance + " | " + runState;
        });
        if (schedInstance.isWorkflowNode()) {
            updateWorkflowEdgeState(schedInstance, Integer.valueOf(((RunState) obtainRunState.a).value()), RUN_STATE_TERMINABLE);
        }
    }

    private void resumeInstance(SchedInstance schedInstance) {
        long longValue = schedInstance.getInstanceId().longValue();
        assertOneAffectedRow(this.instanceMapper.updateState(longValue, RunState.WAITING.value(), RunState.PAUSED.value()), "Resume sched instance failed.");
        int updateStateByInstanceId = this.taskMapper.updateStateByInstanceId(longValue, ExecuteState.WAITING.value(), EXECUTE_STATE_PAUSED, null);
        assertManyAffectedRow(updateStateByInstanceId, "Resume sched task failed.");
        Tuple3<SchedJob, SchedInstance, List<SchedTask>> buildDispatchParams = buildDispatchParams(longValue, updateStateByInstanceId);
        TransactionUtils.doAfterTransactionCommit(() -> {
            super.dispatch((SchedJob) buildDispatchParams.a, (SchedInstance) buildDispatchParams.b, (List) buildDispatchParams.c);
        });
    }

    private void updateWorkflowLeadState(SchedInstance schedInstance) {
        Assert.isTrue(schedInstance.isWorkflowLead(), () -> {
            return "Must terminate workflow lead instance: " + schedInstance;
        });
        List<SchedWorkflow> findByWnstanceId = this.workflowMapper.findByWnstanceId(schedInstance.getWnstanceId().longValue());
        WorkflowGraph workflowGraph = new WorkflowGraph(findByWnstanceId);
        updateWorkflowEndState(workflowGraph);
        if (workflowGraph.allMatch(entry -> {
            return ((SchedWorkflow) entry.getValue()).isTerminal();
        })) {
            RunState runState = workflowGraph.anyMatch(entry2 -> {
                return ((SchedWorkflow) entry2.getValue()).isFailure();
            }) ? RunState.CANCELED : RunState.FINISHED;
            assertOneAffectedRow(this.instanceMapper.terminate(schedInstance.getWnstanceId().longValue(), runState.value(), RUN_STATE_TERMINABLE, new Date()), () -> {
                return "Update workflow lead instance state failed: " + schedInstance + " | " + runState;
            });
        } else if (findByWnstanceId.stream().noneMatch(schedWorkflow -> {
            return RunState.RUNNING.equals(schedWorkflow.getRunState());
        })) {
            RunState runState2 = RunState.PAUSED;
            assertOneAffectedRow(this.instanceMapper.updateState(schedInstance.getWnstanceId().longValue(), runState2.value(), schedInstance.getRunState().intValue()), () -> {
                return "Update workflow lead instance state failed: " + schedInstance + " | " + runState2;
            });
        }
    }

    private void updateWorkflowEdgeState(SchedInstance schedInstance, Integer num, List<Integer> list) {
        Assert.isTrue(this.workflowMapper.update(schedInstance.getWnstanceId().longValue(), schedInstance.parseAttach().getCurNode(), num, null, list, schedInstance.getInstanceId()) > 0, () -> {
            return "Update workflow state failed: " + schedInstance + " | " + num;
        });
    }

    private void updateWorkflowEndState(WorkflowGraph workflowGraph) {
        long longValue = ((SchedWorkflow) Collects.getFirst(workflowGraph.map().values())).getWnstanceId().longValue();
        if (workflowGraph.anyMatch(entry -> {
            return ((DAGEdge) entry.getKey()).getTarget().isEnd() && !((SchedWorkflow) entry.getValue()).isTerminal();
        })) {
            Map<DAGEdge, SchedWorkflow> predecessors = workflowGraph.predecessors(DAGNode.END);
            if (predecessors.values().stream().allMatch((v0) -> {
                return v0.isTerminal();
            })) {
                RunState runState = predecessors.values().stream().anyMatch((v0) -> {
                    return v0.isFailure();
                }) ? RunState.CANCELED : RunState.FINISHED;
                Assert.isTrue(this.workflowMapper.update(longValue, DAGNode.END.toString(), Integer.valueOf(runState.value()), null, RUN_STATE_TERMINABLE, null) > 0, () -> {
                    return "Update workflow end node failed: " + longValue + " | " + runState;
                });
                predecessors.forEach((dAGEdge, schedWorkflow) -> {
                    workflowGraph.get(dAGEdge.getTarget(), DAGNode.END).setRunState(Integer.valueOf(runState.value()));
                });
            }
        }
    }

    private void createWorkflowNode(SchedInstance schedInstance, WorkflowGraph workflowGraph, Map<DAGEdge, SchedWorkflow> map, Function<Throwable, Boolean> function) {
        SchedJobMapper schedJobMapper = this.jobMapper;
        schedJobMapper.getClass();
        SchedJob schedJob = (SchedJob) LazyLoader.of(SchedJob.class, (v1) -> {
            return r1.get(v1);
        }, schedInstance.getJobId());
        long longValue = schedInstance.getWnstanceId().longValue();
        Date date = new Date();
        HashSet hashSet = new HashSet();
        for (Map.Entry<DAGEdge, SchedWorkflow> entry : map.entrySet()) {
            DAGNode target = entry.getKey().getTarget();
            SchedWorkflow value = entry.getValue();
            if (!target.isEnd() && RunState.WAITING.equals(value.getRunState()) && hashSet.add(target)) {
                Map<DAGEdge, SchedWorkflow> predecessors = workflowGraph.predecessors(target);
                if (predecessors.values().stream().anyMatch(schedWorkflow -> {
                    return !RunState.of(schedWorkflow.getRunState()).isTerminal();
                })) {
                    continue;
                } else if (predecessors.values().stream().anyMatch(schedWorkflow2 -> {
                    return RunState.of(schedWorkflow2.getRunState()).isFailure();
                })) {
                    RunState runState = RunState.CANCELED;
                    Assert.isTrue(this.workflowMapper.update(longValue, value.getCurNode(), Integer.valueOf(runState.value()), null, RUN_STATE_TERMINABLE, null) > 0, () -> {
                        return "Update workflow cur node state failed: " + value + " | " + runState;
                    });
                } else {
                    try {
                        long generateId = generateId();
                        List<SchedTask> splitTasks = splitTasks(JobHandlerParam.from(schedJob, target.getName()), generateId, new Date());
                        SchedInstance create = SchedInstance.create(generateId, schedJob.getJobId().longValue(), RunType.of(schedInstance.getRunType()), schedInstance.getTriggerTime().longValue() + value.getSequence().intValue(), 0, date);
                        create.setRnstanceId(Long.valueOf(longValue));
                        create.setPnstanceId(predecessors.isEmpty() ? null : ((SchedWorkflow) Collects.getFirst(predecessors.values())).getInstanceId());
                        create.setWnstanceId(Long.valueOf(longValue));
                        create.setAttach(Jsons.toJson(new InstanceAttach(value.getCurNode())));
                        Assert.isTrue(this.workflowMapper.update(longValue, value.getCurNode(), Integer.valueOf(RunState.RUNNING.value()), Long.valueOf(generateId), RUN_STATE_WAITING, null) > 0, () -> {
                            return "Start workflow node failed: " + value;
                        });
                        this.instanceMapper.insert(create);
                        SchedTaskMapper schedTaskMapper = this.taskMapper;
                        schedTaskMapper.getClass();
                        Collects.batchProcess(splitTasks, schedTaskMapper::batchInsert, 200);
                        TransactionUtils.doAfterTransactionCommit(() -> {
                            super.dispatch(schedJob, create, splitTasks);
                        });
                    } catch (Throwable th) {
                        if (Boolean.FALSE.equals(function.apply(th))) {
                            return;
                        }
                    }
                }
            }
        }
    }

    private void afterTerminateTask(SchedInstance schedInstance) {
        RunState of = RunState.of(schedInstance.getRunState());
        SchedJobMapper schedJobMapper = this.jobMapper;
        schedJobMapper.getClass();
        LazyLoader<SchedJob> of2 = LazyLoader.of((v1) -> {
            return r0.get(v1);
        }, schedInstance.getJobId());
        if (of == RunState.CANCELED) {
            retryJob(schedInstance, of2);
        } else {
            if (of != RunState.FINISHED) {
                throw new IllegalStateException("Unknown terminate run state " + of);
            }
            if (schedInstance.isWorkflowNode()) {
                processWorkflow(schedInstance);
            } else {
                dependJob(schedInstance);
            }
        }
        updateFixedDelayNextTriggerTime(schedInstance, of2);
    }

    private void updateFixedDelayNextTriggerTime(SchedInstance schedInstance, LazyLoader<SchedJob> lazyLoader) {
        SchedJob schedJob;
        if (schedInstance.isWorkflowNode()) {
            return;
        }
        long obtainRnstanceId = schedInstance.obtainRnstanceId();
        SchedInstance schedInstance2 = obtainRnstanceId == schedInstance.getInstanceId().longValue() ? schedInstance : this.instanceMapper.get(obtainRnstanceId);
        if (!schedInstance2.getJobId().equals(schedInstance.getJobId()) || !RunType.SCHEDULE.equals(schedInstance2.getRunType()) || (schedJob = (SchedJob) lazyLoader.orElse((Object) null)) == null || schedJob.retryable(RunState.of(schedInstance.getRunState()), schedInstance.obtainRetriedCount())) {
            return;
        }
        super.updateFixedDelayNextTriggerTime(schedJob, schedInstance.getRunEndTime());
    }

    private void processWorkflow(SchedInstance schedInstance) {
        if (schedInstance.isWorkflowNode()) {
            RunState of = RunState.of(schedInstance.getRunState());
            Long wnstanceId = schedInstance.getWnstanceId();
            updateWorkflowEdgeState(schedInstance, Integer.valueOf(of.value()), RUN_STATE_TERMINABLE);
            if (of == RunState.CANCELED) {
                this.workflowMapper.update(wnstanceId.longValue(), null, Integer.valueOf(RunState.CANCELED.value()), null, RUN_STATE_RUNNABLE, null);
            }
            WorkflowGraph workflowGraph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(wnstanceId.longValue()));
            updateWorkflowEndState(workflowGraph);
            if (workflowGraph.allMatch(entry -> {
                return ((SchedWorkflow) entry.getValue()).isTerminal();
            })) {
                RunState runState = workflowGraph.anyMatch(entry2 -> {
                    return ((SchedWorkflow) entry2.getValue()).isFailure();
                }) ? RunState.CANCELED : RunState.FINISHED;
                assertOneAffectedRow(this.instanceMapper.terminate(wnstanceId.longValue(), runState.value(), RUN_STATE_TERMINABLE, new Date()), () -> {
                    return "Terminate workflow lead instance failed: " + schedInstance + " | " + runState;
                });
                afterTerminateTask(this.instanceMapper.get(wnstanceId.longValue()));
            } else {
                if (of == RunState.CANCELED) {
                    return;
                }
                createWorkflowNode(this.instanceMapper.get(wnstanceId.longValue()), workflowGraph, workflowGraph.successors(DAGNode.fromString(schedInstance.parseAttach().getCurNode())), th -> {
                    LOG.error("Split workflow job task error: " + schedInstance, th);
                    onCreateWorkflowNodeFailed(schedInstance.getWnstanceId());
                    return false;
                });
            }
        }
    }

    private void onCreateWorkflowNodeFailed(Long l) {
        Integer valueOf = Integer.valueOf(RunState.CANCELED.value());
        this.workflowMapper.update(l.longValue(), null, valueOf, null, RUN_STATE_RUNNABLE, null);
        WorkflowGraph workflowGraph = new WorkflowGraph(this.workflowMapper.findByWnstanceId(l.longValue()));
        updateWorkflowEndState(workflowGraph);
        Assert.state(workflowGraph.allMatch(entry -> {
            return ((SchedWorkflow) entry.getValue()).isTerminal();
        }), "Workflow not all terminal.");
        assertOneAffectedRow(this.instanceMapper.terminate(l.longValue(), valueOf.intValue(), RUN_STATE_TERMINABLE, new Date()), () -> {
            return "Cancel workflow failed: " + l;
        });
        afterTerminateTask(this.instanceMapper.get(l.longValue()));
    }

    private void retryJob(SchedInstance schedInstance, LazyLoader<SchedJob> lazyLoader) {
        List<SchedTask> splitTasks;
        SchedJob schedJob = (SchedJob) lazyLoader.orElseGet(() -> {
            LOG.error("Sched job not found {}", schedInstance.getJobId());
            return null;
        });
        int obtainRetriedCount = schedInstance.obtainRetriedCount();
        if (schedJob == null || !schedJob.retryable(RunState.of(schedInstance.getRunState()), obtainRetriedCount)) {
            processWorkflow(schedInstance);
            return;
        }
        long generateId = generateId();
        if (schedInstance.isWorkflowNode()) {
            Assert.isTrue(this.workflowMapper.update(schedInstance.getWnstanceId().longValue(), schedInstance.parseAttach().getCurNode(), null, Long.valueOf(generateId), RUN_STATE_RUNNING, schedInstance.getInstanceId()) > 0, () -> {
                return "Retry workflow node failed: " + schedInstance;
            });
        }
        int i = obtainRetriedCount + 1;
        Date date = new Date();
        SchedInstance create = SchedInstance.create(generateId, schedJob.getJobId().longValue(), RunType.RETRY, schedJob.computeRetryTriggerTime(i, date), i, date);
        create.setRnstanceId(Long.valueOf(schedInstance.obtainRnstanceId()));
        create.setPnstanceId(schedInstance.getInstanceId());
        create.setWnstanceId(schedInstance.getWnstanceId());
        create.setAttach(schedInstance.getAttach());
        RetryType of = RetryType.of(schedJob.getRetryType());
        if (of == RetryType.ALL) {
            try {
                splitTasks = splitTasks(JobHandlerParam.from(schedJob), create.getInstanceId().longValue(), date);
            } catch (Throwable th) {
                LOG.error("Split retry job error: " + schedJob + ", " + schedInstance, th);
                processWorkflow(schedInstance);
                return;
            }
        } else {
            if (of != RetryType.FAILED) {
                throw new IllegalArgumentException("Unknown job retry type: " + schedJob.getJobId() + ", " + of);
            }
            splitTasks = (List) this.taskMapper.findLargeByInstanceId(schedInstance.getInstanceId().longValue()).stream().filter(schedTask -> {
                return ExecuteState.of(schedTask.getExecuteState()).isFailure();
            }).filter(schedTask2 -> {
                return !RouteStrategy.BROADCAST.equals(schedJob.getRouteStrategy()) || super.isAliveWorker(schedTask2.getWorker());
            }).map(schedTask3 -> {
                return SchedTask.create(schedTask3.getTaskParam(), generateId(), generateId, schedTask3.getTaskNo().intValue(), schedTask3.getTaskCount().intValue(), date, schedTask3.getWorker());
            }).collect(Collectors.toList());
        }
        Assert.notEmpty(splitTasks, "Insert list of task cannot be empty.");
        this.instanceMapper.insert(create);
        SchedTaskMapper schedTaskMapper = this.taskMapper;
        schedTaskMapper.getClass();
        Collects.batchProcess(splitTasks, schedTaskMapper::batchInsert, 200);
        List<SchedTask> list = splitTasks;
        TransactionUtils.doAfterTransactionCommit(() -> {
            super.dispatch(schedJob, create, list);
        });
    }

    private void dependJob(SchedInstance schedInstance) {
        Runnable runnable;
        List<SchedDepend> findByParentJobId = this.dependMapper.findByParentJobId(schedInstance.getJobId().longValue());
        if (CollectionUtils.isEmpty(findByParentJobId)) {
            return;
        }
        for (SchedDepend schedDepend : findByParentJobId) {
            SchedJob schedJob = this.jobMapper.get(schedDepend.getChildJobId().longValue());
            if (schedJob == null) {
                LOG.error("Child sched job not found: {} | {}", schedDepend.getParentJobId(), schedDepend.getChildJobId());
            } else if (!JobState.DISABLE.equals(schedJob.getJobState()) && (runnable = (Runnable) TransactionUtils.doInNestedTransaction((PlatformTransactionManager) Objects.requireNonNull(this.transactionTemplate.getTransactionManager()), () -> {
                TriggerInstanceCreator<?> of = TriggerInstanceCreator.of(schedJob.getJobType(), this);
                TriggerInstance create = of.create(schedJob, RunType.DEPEND, ((schedInstance.getTriggerTime().longValue() / 1000) * 1000) + schedDepend.getSequence().intValue());
                create.getInstance().setRnstanceId(Long.valueOf(schedInstance.obtainRnstanceId()));
                create.getInstance().setPnstanceId(schedInstance.getInstanceId());
                createInstance(create);
                return () -> {
                    of.dispatch(schedJob, create);
                };
            }, th -> {
                LOG.error("Depend job instance created fail: " + schedInstance + " | " + schedJob, th);
            })) != null) {
                TransactionUtils.doAfterTransactionCommit(runnable);
            }
        }
    }

    private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance schedInstance, Operations operations) {
        ArrayList arrayList = new ArrayList();
        SchedJobMapper schedJobMapper = this.jobMapper;
        schedJobMapper.getClass();
        ExecuteTaskParam.Builder builder = ExecuteTaskParam.builder(schedInstance, (SchedJob) LazyLoader.of(SchedJob.class, (v1) -> {
            return r1.get(v1);
        }, schedInstance.getJobId()));
        for (SchedTask schedTask : this.taskMapper.findBaseByInstanceId(schedInstance.getInstanceId().longValue())) {
            if (ExecuteState.EXECUTING.equals(schedTask.getExecuteState())) {
                Worker deserialize = Worker.deserialize(schedTask.getWorker());
                if (super.isAliveWorker(deserialize)) {
                    arrayList.add(builder.build(operations, schedTask.getTaskId().longValue(), 0L, deserialize));
                } else if (isOneAffectedRow(this.taskMapper.terminate(schedTask.getTaskId().longValue(), schedTask.getWorker(), operations.toState().value(), ExecuteState.EXECUTING.value(), operations.toState().isTerminal() ? new Date() : null, null))) {
                    LOG.info("Cancel the dead task success: {}", schedTask);
                } else {
                    LOG.error("Cancel the dead task failed: {}", schedTask);
                    arrayList.add(builder.build(operations, schedTask.getTaskId().longValue(), 0L, deserialize));
                }
            }
        }
        return arrayList;
    }

    private Tuple3<SchedJob, SchedInstance, List<SchedTask>> buildDispatchParams(long j, int i) {
        SchedInstance schedInstance = this.instanceMapper.get(j);
        SchedJob schedJob = this.jobMapper.get(schedInstance.getJobId().longValue());
        Assert.notNull(schedJob, "Not found job: " + schedInstance.getJobId());
        List list = (List) this.taskMapper.findLargeByInstanceId(j).stream().filter(schedTask -> {
            return ExecuteState.WAITING.equals(schedTask.getExecuteState());
        }).collect(Collectors.toList());
        if (list.size() != i) {
            throw new IllegalStateException("Invalid dispatching tasks size: expect=" + i + ", actual=" + list.size());
        }
        return Tuple3.of(schedJob, schedInstance, list);
    }
}
