package me.kpali.wolfflow.core.executor.impl;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import me.kpali.wolfflow.core.cluster.IClusterController;
import me.kpali.wolfflow.core.config.ExecutorConfig;
import me.kpali.wolfflow.core.enums.TaskStatusEnum;
import me.kpali.wolfflow.core.event.TaskStatusEventPublisher;
import me.kpali.wolfflow.core.exception.TaskExecuteException;
import me.kpali.wolfflow.core.exception.TaskFlowExecuteException;
import me.kpali.wolfflow.core.exception.TaskFlowInterruptedException;
import me.kpali.wolfflow.core.exception.TaskInterruptedException;
import me.kpali.wolfflow.core.exception.TaskStopException;
import me.kpali.wolfflow.core.exception.TryLockException;
import me.kpali.wolfflow.core.executor.ITaskFlowExecutor;
import me.kpali.wolfflow.core.logger.ITaskLogger;
import me.kpali.wolfflow.core.model.ClusterConstants;
import me.kpali.wolfflow.core.model.ContextKey;
import me.kpali.wolfflow.core.model.Link;
import me.kpali.wolfflow.core.model.Task;
import me.kpali.wolfflow.core.model.TaskContextWrapper;
import me.kpali.wolfflow.core.model.TaskFlow;
import me.kpali.wolfflow.core.model.TaskFlowContextWrapper;
import me.kpali.wolfflow.core.model.TaskLog;
import me.kpali.wolfflow.core.scheduler.impl.SystemTimeUtils;
import me.kpali.wolfflow.core.util.TaskFlowUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:me/kpali/wolfflow/core/executor/impl/DefaultTaskFlowExecutor.class */
public class DefaultTaskFlowExecutor implements ITaskFlowExecutor {
    private static final Logger log = LoggerFactory.getLogger(DefaultTaskFlowExecutor.class);

    @Autowired
    private ExecutorConfig executorConfig;

    @Autowired
    private TaskStatusEventPublisher taskStatusEventPublisher;

    @Autowired
    private IClusterController clusterController;

    @Autowired
    private ITaskLogger taskLogger;

    @Autowired
    private SystemTimeUtils systemTimeUtils;

    @Override // me.kpali.wolfflow.core.executor.ITaskFlowExecutor
    public void beforeExecute(TaskFlow taskFlow, Map<String, Object> map) throws TaskFlowExecuteException {
        TaskFlow taskFlow2;
        TaskFlow taskFlow3;
        Map<String, Object> taskContext;
        TaskFlowContextWrapper taskFlowContextWrapper = new TaskFlowContextWrapper(map);
        Long l = (Long) taskFlowContextWrapper.getValue("logId", Long.class);
        if (TaskFlowUtils.topologicalSort(taskFlow) == null) {
            throw new TaskFlowExecuteException("任务流不是一个有向无环图，请检查是否存在回路！");
        }
        if (taskFlow.getTaskList().size() == 0) {
            return;
        }
        Long l2 = (Long) taskFlowContextWrapper.getValue(ContextKey.FROM_TASK_ID, Long.class);
        Long l3 = (Long) taskFlowContextWrapper.getValue(ContextKey.TO_TASK_ID, Long.class);
        if (l2 == null && l3 == null) {
            taskFlow2 = TaskFlowUtils.prune(taskFlow, null, null);
            taskFlow3 = taskFlow2;
        } else if (l2 != null && l2.equals(l3)) {
            taskFlow2 = TaskFlowUtils.prune(taskFlow, l2, l3);
            taskFlow3 = TaskFlowUtils.prune(taskFlow, l2, null);
        } else if (l2 != null) {
            taskFlow2 = TaskFlowUtils.prune(taskFlow, l2, l3);
            taskFlow3 = taskFlow2;
        } else {
            TaskFlow prune = TaskFlowUtils.prune(taskFlow, l2, l3);
            TaskFlow excludeSuccessfulTasks = excludeSuccessfulTasks(prune);
            taskFlow2 = excludeSuccessfulTasks == null ? prune : excludeSuccessfulTasks;
            taskFlow3 = taskFlow2;
        }
        map.put(ContextKey.EXECUTE_TASK_FLOW, taskFlow2);
        try {
            boolean tryLock = this.clusterController.tryLock(ClusterConstants.TASK_LOG_LOCK, 10L, 15L, TimeUnit.SECONDS);
            if (!tryLock) {
                throw new TryLockException("获取任务日志记录锁失败！");
            }
            for (Task task : taskFlow.getTaskList()) {
                boolean z = false;
                Iterator<Task> it = taskFlow3.getTaskList().iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (task.getId().equals(it.next().getId())) {
                            z = true;
                            break;
                        }
                    } else {
                        break;
                    }
                }
                if (z) {
                    this.taskLogger.deleteTaskStatus(task.getId());
                    TaskContextWrapper taskContextWrapper = new TaskContextWrapper();
                    ArrayList arrayList = new ArrayList();
                    taskFlow.getLinkList().forEach(link -> {
                        if (link.getTarget().equals(task.getId())) {
                            arrayList.add(link.getSource());
                        }
                    });
                    taskContextWrapper.put(ContextKey.PARENT_TASK_ID_LIST, arrayList);
                    taskFlowContextWrapper.putTaskContext(task.getId().toString(), taskContextWrapper.getContext());
                } else {
                    TaskLog taskStatus = this.taskLogger.getTaskStatus(task.getId());
                    if (taskStatus != null) {
                        taskStatus.setLogId(Long.valueOf(this.systemTimeUtils.getUniqueTimeStamp()));
                        taskStatus.setTaskFlowLogId(l);
                        this.taskLogger.add(taskStatus);
                        Map<String, Object> taskFlowContext = taskStatus.getTaskFlowContext();
                        if (taskFlowContext != null) {
                            TaskFlowContextWrapper taskFlowContextWrapper2 = new TaskFlowContextWrapper(taskFlowContext);
                            if (taskFlowContextWrapper2.getTaskContexts() != null && (taskContext = taskFlowContextWrapper2.getTaskContext(task.getId().toString())) != null) {
                                taskFlowContextWrapper.putTaskContext(task.getId().toString(), taskContext);
                            }
                        }
                    }
                }
            }
            if (tryLock) {
                this.clusterController.unlock(ClusterConstants.TASK_LOG_LOCK);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                this.clusterController.unlock(ClusterConstants.TASK_LOG_LOCK);
            }
            throw th;
        }
    }

    @Override // me.kpali.wolfflow.core.executor.ITaskFlowExecutor
    public void execute(TaskFlow taskFlow, Map<String, Object> map) throws TaskFlowExecuteException, TaskFlowInterruptedException {
        TaskFlowContextWrapper taskFlowContextWrapper = new TaskFlowContextWrapper(map);
        Long l = (Long) taskFlowContextWrapper.getValue("logId", Long.class);
        TaskFlow taskFlow2 = (TaskFlow) taskFlowContextWrapper.getValue(ContextKey.EXECUTE_TASK_FLOW, TaskFlow.class);
        try {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.executorConfig.getCorePoolSize().intValue(), this.executorConfig.getMaximumPoolSize().intValue(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), new ThreadFactoryBuilder().setNameFormat("taskFlowExecutor-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            taskFlow.getTaskList().forEach(task -> {
                hashMap.put(task.getId(), task);
                ArrayList arrayList = new ArrayList();
                for (Link link : taskFlow.getLinkList()) {
                    if (link.getTarget().equals(task.getId())) {
                        arrayList.add(link.getSource());
                    }
                }
                hashMap2.put(task.getId(), arrayList);
            });
            HashMap hashMap3 = new HashMap();
            HashMap hashMap4 = new HashMap();
            taskFlow2.getTaskList().forEach(task2 -> {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (Link link : taskFlow2.getLinkList()) {
                    if (link.getTarget().equals(task2.getId())) {
                        arrayList.add(link.getSource());
                    }
                    if (link.getSource().equals(task2.getId())) {
                        arrayList2.add(link.getTarget());
                    }
                }
                hashMap3.put(task2.getId(), arrayList);
                hashMap4.put(task2.getId(), arrayList2);
            });
            HashMap hashMap5 = new HashMap();
            taskFlow2.getTaskList().forEach(task3 -> {
                hashMap5.put(task3.getId(), Integer.valueOf(((List) hashMap3.get(task3.getId())).size()));
            });
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            for (Long l2 : hashMap5.keySet()) {
                if (((Integer) hashMap5.get(l2)).intValue() == 0) {
                    concurrentHashMap.put(l2, TaskStatusEnum.WAIT_FOR_EXECUTE.getCode());
                    this.taskStatusEventPublisher.publishEvent((Task) hashMap.get(l2), taskFlow2.getId(), map, TaskStatusEnum.WAIT_FOR_EXECUTE.getCode(), null, true);
                }
            }
            boolean z = true;
            boolean z2 = false;
            while (!concurrentHashMap.isEmpty()) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    log.warn(e.getMessage(), e);
                }
                z2 = this.clusterController.stopRequestContains(l).booleanValue();
                for (Long l3 : concurrentHashMap.keySet()) {
                    String str = (String) concurrentHashMap.get(l3);
                    if (TaskStatusEnum.WAIT_FOR_EXECUTE.getCode().equals(str)) {
                        Task task4 = (Task) hashMap.get(l3);
                        concurrentHashMap.put(task4.getId(), TaskStatusEnum.EXECUTING.getCode());
                        threadPoolExecutor.execute(() -> {
                            try {
                                Iterator it = ((List) hashMap2.get(task4.getId())).iterator();
                                while (it.hasNext()) {
                                    TaskLog taskStatus = this.taskLogger.getTaskStatus((Long) it.next());
                                    if (taskStatus == null || !TaskStatusEnum.EXECUTE_SUCCESS.getCode().equals(taskStatus.getStatus())) {
                                        throw new TaskExecuteException("父任务必须先执行成功");
                                    }
                                }
                                task4.beforeExecute(map);
                                this.taskStatusEventPublisher.publishEvent(task4, taskFlow2.getId(), map, TaskStatusEnum.EXECUTING.getCode(), null, true);
                                task4.execute(map);
                                task4.afterExecute(map);
                                concurrentHashMap.put(task4.getId(), TaskStatusEnum.EXECUTE_SUCCESS.getCode());
                                this.taskStatusEventPublisher.publishEvent(task4, taskFlow2.getId(), map, TaskStatusEnum.EXECUTE_SUCCESS.getCode(), null, true);
                            } catch (TaskExecuteException | TaskInterruptedException e2) {
                                log.error("任务执行失败！任务ID：" + task4.getId() + " 异常信息：" + e2.getMessage(), e2);
                                concurrentHashMap.put(task4.getId(), TaskStatusEnum.EXECUTE_FAILURE.getCode());
                                this.taskStatusEventPublisher.publishEvent(task4, taskFlow2.getId(), map, TaskStatusEnum.EXECUTE_FAILURE.getCode(), e2.getMessage(), true);
                            }
                        });
                    } else if (z2 && TaskStatusEnum.EXECUTING.getCode().equals(str)) {
                        Task task5 = (Task) hashMap.get(l3);
                        concurrentHashMap.put(task5.getId(), TaskStatusEnum.STOPPING.getCode());
                        try {
                            this.taskStatusEventPublisher.publishEvent(task5, taskFlow2.getId(), map, TaskStatusEnum.STOPPING.getCode(), null, true);
                            task5.stop(map);
                        } catch (TaskStopException e2) {
                            log.error("任务终止失败！任务ID：" + task5.getId() + " 异常信息：" + e2.getMessage(), e2);
                        }
                    } else if (TaskStatusEnum.EXECUTE_SUCCESS.getCode().equals(str)) {
                        for (Long l4 : (List) hashMap4.get(l3)) {
                            int intValue = ((Integer) hashMap5.get(l4)).intValue() - 1;
                            hashMap5.put(l4, Integer.valueOf(intValue));
                            if (intValue == 0) {
                                concurrentHashMap.put(l4, TaskStatusEnum.WAIT_FOR_EXECUTE.getCode());
                                this.taskStatusEventPublisher.publishEvent((Task) hashMap.get(l4), taskFlow2.getId(), map, TaskStatusEnum.WAIT_FOR_EXECUTE.getCode(), null, true);
                            }
                        }
                        concurrentHashMap.remove(l3);
                    } else if (TaskStatusEnum.EXECUTE_FAILURE.getCode().equals(str) || TaskStatusEnum.SKIPPED.getCode().equals(str)) {
                        if (TaskStatusEnum.EXECUTE_FAILURE.getCode().equals(str)) {
                            z = false;
                        }
                        for (Long l5 : (List) hashMap4.get(l3)) {
                            concurrentHashMap.put(l5, TaskStatusEnum.SKIPPED.getCode());
                            this.taskStatusEventPublisher.publishEvent((Task) hashMap.get(l5), taskFlow2.getId(), map, TaskStatusEnum.SKIPPED.getCode(), null, true);
                        }
                        concurrentHashMap.remove(l3);
                    }
                }
            }
            if (z2) {
                throw new TaskFlowInterruptedException("任务流被终止执行");
            }
            if (!z) {
                throw new TaskFlowExecuteException("一个或多个任务执行失败");
            }
        } finally {
            this.clusterController.stopRequestRemove(l);
        }
    }

    @Override // me.kpali.wolfflow.core.executor.ITaskFlowExecutor
    public void afterExecute(TaskFlow taskFlow, Map<String, Object> map) throws TaskFlowExecuteException {
    }

    private TaskFlow excludeSuccessfulTasks(TaskFlow taskFlow) {
        ArrayList arrayList = new ArrayList();
        try {
            boolean tryLock = this.clusterController.tryLock(ClusterConstants.TASK_LOG_LOCK, 10L, 15L, TimeUnit.SECONDS);
            if (!tryLock) {
                throw new TryLockException("获取任务日志记录锁失败！");
            }
            List<TaskLog> listTaskStatus = this.taskLogger.listTaskStatus(taskFlow.getId());
            if (tryLock) {
                this.clusterController.unlock(ClusterConstants.TASK_LOG_LOCK);
            }
            if (listTaskStatus == null || listTaskStatus.isEmpty()) {
                return taskFlow;
            }
            for (TaskLog taskLog : listTaskStatus) {
                if (TaskStatusEnum.EXECUTE_SUCCESS.getCode().equals(taskLog.getStatus())) {
                    arrayList.add(taskLog.getTask().getId());
                }
            }
            TaskFlow taskFlow2 = new TaskFlow();
            taskFlow2.setId(taskFlow.getId());
            taskFlow2.setCron(taskFlow.getCron());
            taskFlow2.setTaskList(new ArrayList());
            taskFlow2.setLinkList(new ArrayList());
            for (Task task : taskFlow.getTaskList()) {
                if (!arrayList.contains(task.getId())) {
                    taskFlow2.getTaskList().add(task);
                }
            }
            for (Link link : taskFlow.getLinkList()) {
                if (!arrayList.contains(link.getSource()) && !arrayList.contains(link.getTarget())) {
                    taskFlow2.getLinkList().add(link);
                }
            }
            return taskFlow2;
        } catch (Throwable th) {
            if (0 != 0) {
                this.clusterController.unlock(ClusterConstants.TASK_LOG_LOCK);
            }
            throw th;
        }
    }
}
