package com.netflix.conductor.core.reconciliation;

import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@ConditionalOnProperty(name = {"conductor.workflow-repair-service.enabled"}, havingValue = "true")
@Service
/* loaded from: input_file:com/netflix/conductor/core/reconciliation/WorkflowRepairService.class */
public class WorkflowRepairService {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowRepairService.class);
    private final ExecutionDAO executionDAO;
    private final QueueDAO queueDAO;
    private final ConductorProperties properties;
    private SystemTaskRegistry systemTaskRegistry;
    private final Predicate<TaskModel> isTaskRepairable = taskModel -> {
        if (!this.systemTaskRegistry.isSystemTask(taskModel.getTaskType())) {
            return taskModel.getStatus() == TaskModel.Status.SCHEDULED;
        }
        WorkflowSystemTask workflowSystemTask = this.systemTaskRegistry.get(taskModel.getTaskType());
        return workflowSystemTask.isAsync() && (!workflowSystemTask.isAsyncComplete(taskModel) || (workflowSystemTask.isAsyncComplete(taskModel) && taskModel.getStatus() == TaskModel.Status.SCHEDULED)) && (taskModel.getStatus() == TaskModel.Status.IN_PROGRESS || taskModel.getStatus() == TaskModel.Status.SCHEDULED);
    };

    public WorkflowRepairService(ExecutionDAO executionDAO, QueueDAO queueDAO, ConductorProperties conductorProperties, SystemTaskRegistry systemTaskRegistry) {
        this.executionDAO = executionDAO;
        this.queueDAO = queueDAO;
        this.properties = conductorProperties;
        this.systemTaskRegistry = systemTaskRegistry;
        LOGGER.info("WorkflowRepairService Initialized");
    }

    public boolean verifyAndRepairWorkflow(String str, boolean z) {
        WorkflowModel workflow = this.executionDAO.getWorkflow(str, z);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        atomicBoolean.set(verifyAndRepairDeciderQueue(workflow));
        if (z) {
            workflow.getTasks().forEach(taskModel -> {
                atomicBoolean.set(verifyAndRepairTask(taskModel));
            });
        }
        return atomicBoolean.get();
    }

    public void verifyAndRepairWorkflowTasks(String str) {
        verifyAndRepairWorkflowTasks((WorkflowModel) Optional.ofNullable(this.executionDAO.getWorkflow(str, true)).orElseThrow(() -> {
            return new NotFoundException("Could not find workflow: " + str);
        }));
    }

    public void verifyAndRepairWorkflowTasks(WorkflowModel workflowModel) {
        workflowModel.getTasks().forEach(this::verifyAndRepairTask);
        verifyAndRepairWorkflow(workflowModel.getParentWorkflowId());
    }

    private boolean verifyAndRepairDeciderQueue(WorkflowModel workflowModel) {
        if (workflowModel.getStatus().isTerminal()) {
            return false;
        }
        return verifyAndRepairWorkflow(workflowModel.getWorkflowId());
    }

    @VisibleForTesting
    boolean verifyAndRepairTask(TaskModel taskModel) {
        if (this.isTaskRepairable.test(taskModel)) {
            String queueName = QueueUtils.getQueueName(taskModel);
            if (this.queueDAO.containsMessage(queueName, taskModel.getTaskId())) {
                return false;
            }
            this.queueDAO.push(queueName, taskModel.getTaskId(), taskModel.getCallbackAfterSeconds());
            LOGGER.info("Task {} in workflow {} re-queued for repairs", taskModel.getTaskId(), taskModel.getWorkflowInstanceId());
            Monitors.recordQueueMessageRepushFromRepairService(taskModel.getTaskDefName());
            return true;
        }
        if (!taskModel.getTaskType().equals("SUB_WORKFLOW") || taskModel.getStatus() != TaskModel.Status.IN_PROGRESS) {
            return false;
        }
        WorkflowModel workflow = this.executionDAO.getWorkflow(taskModel.getSubWorkflowId(), false);
        if (!workflow.getStatus().isTerminal()) {
            return false;
        }
        LOGGER.info("Repairing sub workflow task {} for sub workflow {} in workflow {}", new Object[]{taskModel.getTaskId(), taskModel.getSubWorkflowId(), taskModel.getWorkflowInstanceId()});
        repairSubWorkflowTask(taskModel, workflow);
        return true;
    }

    private boolean verifyAndRepairWorkflow(String str) {
        if (!StringUtils.isNotEmpty(str) || this.queueDAO.containsMessage(Utils.DECIDER_QUEUE, str)) {
            return false;
        }
        this.queueDAO.push(Utils.DECIDER_QUEUE, str, this.properties.getWorkflowOffsetTimeout().getSeconds());
        LOGGER.info("Workflow {} re-queued for repairs", str);
        Monitors.recordQueueMessageRepushFromRepairService(Utils.DECIDER_QUEUE);
        return true;
    }

    private void repairSubWorkflowTask(TaskModel taskModel, WorkflowModel workflowModel) {
        switch (workflowModel.getStatus()) {
            case COMPLETED:
                taskModel.setStatus(TaskModel.Status.COMPLETED);
                break;
            case FAILED:
                taskModel.setStatus(TaskModel.Status.FAILED);
                break;
            case TERMINATED:
                taskModel.setStatus(TaskModel.Status.CANCELED);
                break;
            case TIMED_OUT:
                taskModel.setStatus(TaskModel.Status.TIMED_OUT);
                break;
        }
        taskModel.addOutput(workflowModel.getOutput());
        this.executionDAO.updateTask(taskModel);
    }
}
