package cn.kstry.framework.core.engine;

import cn.kstry.framework.core.bpmn.FlowElement;
import cn.kstry.framework.core.bpmn.SequenceFlow;
import cn.kstry.framework.core.bpmn.ServiceTask;
import cn.kstry.framework.core.bpmn.enums.BpmnTypeEnum;
import cn.kstry.framework.core.bus.BasicStoryBus;
import cn.kstry.framework.core.component.hook.FlowElementHook;
import cn.kstry.framework.core.constant.GlobalProperties;
import cn.kstry.framework.core.container.MethodWrapper;
import cn.kstry.framework.core.container.TaskContainer;
import cn.kstry.framework.core.engine.facade.CustomRoleInfo;
import cn.kstry.framework.core.enums.AsyncTaskState;
import cn.kstry.framework.core.exception.ExceptionEnum;
import cn.kstry.framework.core.exception.KstryException;
import cn.kstry.framework.core.role.Role;
import cn.kstry.framework.core.task.facade.TaskServiceDef;
import cn.kstry.framework.core.task.impl.TaskComponentRegisterProxy;
import cn.kstry.framework.core.util.AssertUtil;
import cn.kstry.framework.core.util.ElementPropertyUtil;
import cn.kstry.framework.core.util.GlobalUtil;
import cn.kstry.framework.core.util.ProxyUtil;
import cn.kstry.framework.core.util.TaskServiceUtil;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

/* loaded from: input_file:cn/kstry/framework/core/engine/BasicFlowTask.class */
public abstract class BasicFlowTask {
    private static final Logger LOGGER;
    private final ThreadPoolExecutor asyncThreadPool;
    private final TaskContainer taskContainer;
    private final Function<MethodWrapper.ParamInjectDef, Object> paramInitStrategy;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:cn/kstry/framework/core/engine/BasicFlowTask$FlowTaskSubscriber.class */
    private static abstract class FlowTaskSubscriber extends BaseSubscriber<Object> {
        private final FlowRegister flowRegister;

        public FlowTaskSubscriber(FlowRegister flowRegister) {
            this.flowRegister = flowRegister;
        }

        protected void hookOnSubscribe(@Nonnull Subscription subscription) {
            request(1L);
        }

        protected void hookOnNext(@Nonnull Object obj) {
            try {
                MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, this.flowRegister.getRequestId());
                if (this.flowRegister.getAsyncTaskCell().isCancelled()) {
                    BasicFlowTask.LOGGER.info("[{}] Story task was cancelled! startId: {}", ExceptionEnum.TASK_CANCELLED.getExceptionCode(), this.flowRegister.getStartFlowElement().getId());
                } else {
                    doNextHook(obj);
                }
            } finally {
                MDC.clear();
            }
        }

        protected void hookOnComplete() {
            try {
                MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, this.flowRegister.getRequestId());
                if (this.flowRegister.getAsyncTaskCell().isCancelled()) {
                    BasicFlowTask.LOGGER.info("[{}] Story task was cancelled! startId: {}", ExceptionEnum.TASK_CANCELLED.getExceptionCode(), this.flowRegister.getStartFlowElement().getId());
                } else {
                    doCompleteHook();
                }
            } finally {
                MDC.clear();
            }
        }

        protected void hookOnError(@Nonnull Throwable th) {
            try {
                MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, this.flowRegister.getRequestId());
                doErrorHook(th);
            } finally {
                MDC.clear();
            }
        }

        abstract void doNextHook(Object obj);

        abstract void doErrorHook(Throwable th);

        abstract void doCompleteHook();
    }

    public BasicFlowTask(ThreadPoolExecutor threadPoolExecutor, TaskContainer taskContainer, Function<MethodWrapper.ParamInjectDef, Object> function) {
        AssertUtil.notNull(taskContainer);
        AssertUtil.notNull(threadPoolExecutor);
        AssertUtil.notNull(function);
        this.asyncThreadPool = threadPoolExecutor;
        this.taskContainer = taskContainer;
        this.paramInitStrategy = function;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doExe(Role role, BasicStoryBus basicStoryBus, FlowRegister flowRegister) {
        boolean strictMode;
        Optional<FlowElement> nextElement = flowRegister.nextElement(basicStoryBus);
        while (true) {
            Optional<FlowElement> optional = nextElement;
            if (!optional.isPresent()) {
                return;
            }
            FlowElement flowElement = optional.get();
            if (ElementPropertyUtil.needOpenAsync(flowElement)) {
                submitAsyncTask(role, basicStoryBus, flowRegister, flowElement);
            } else if (flowElement.getElementType() != BpmnTypeEnum.SERVICE_TASK) {
                continue;
            } else {
                ServiceTask serviceTask = (ServiceTask) flowElement;
                Optional<TaskServiceDef> taskServiceDef = getTaskContainer().getTaskServiceDef(serviceTask.getTaskComponent(), serviceTask.getTaskService(), role);
                if (taskServiceDef.isPresent() || !serviceTask.allowAbsent()) {
                    TaskServiceDef orElseThrow = taskServiceDef.orElseThrow(() -> {
                        return KstryException.buildException(ExceptionEnum.TASK_SERVICE_MATCH_ERROR, ExceptionEnum.TASK_SERVICE_MATCH_ERROR.getDesc() + GlobalUtil.format(" service task id: {}, name: {}", serviceTask.getId(), serviceTask.getName()));
                    });
                    flowRegister.getMonitorTracking().getServiceNodeTracking(flowElement).ifPresent(nodeTracking -> {
                        MethodWrapper methodWrapper = orElseThrow.getMethodWrapper();
                        nodeTracking.setMethodName(methodWrapper.getMethod().getName());
                        nodeTracking.setTargetName(orElseThrow.getTaskComponentTarget().getTarget().getClass().getName());
                        nodeTracking.setAbility((String) Optional.ofNullable(methodWrapper.getAbility()).filter((v0) -> {
                            return StringUtils.isNotBlank(v0);
                        }).orElse(null));
                    });
                    try {
                        Object doInvokeMethod = doInvokeMethod(serviceTask, orElseThrow, basicStoryBus, role);
                        if (doInvokeMethod instanceof Mono) {
                            monoResultHandler(role, basicStoryBus, flowRegister, serviceTask, orElseThrow, doInvokeMethod);
                            flowRegister.getMonitorTracking().finishTaskTracking(flowElement, null);
                            return;
                        } else {
                            resultHandler(role, basicStoryBus, serviceTask, doInvokeMethod, orElseThrow);
                            flowRegister.getMonitorTracking().finishTaskTracking(flowElement, null);
                        }
                    } finally {
                        if (!strictMode) {
                        }
                    }
                }
            }
            nextElement = flowRegister.nextElement(basicStoryBus);
        }
    }

    protected TaskContainer getTaskContainer() {
        return this.taskContainer;
    }

    protected Function<MethodWrapper.ParamInjectDef, Object> getParamInitStrategy() {
        return this.paramInitStrategy;
    }

    public ThreadPoolExecutor getAsyncThreadPool() {
        return this.asyncThreadPool;
    }

    protected abstract AsyncPropertyDef getAsyncPropertyDef();

    private void monoResultHandler(final Role role, final BasicStoryBus basicStoryBus, final FlowRegister flowRegister, final ServiceTask serviceTask, final TaskServiceDef taskServiceDef, Object obj) {
        ((Mono) GlobalUtil.transferNotEmpty(obj, Mono.class)).subscribe(new FlowTaskSubscriber(flowRegister) { // from class: cn.kstry.framework.core.engine.BasicFlowTask.1
            @Override // cn.kstry.framework.core.engine.BasicFlowTask.FlowTaskSubscriber
            protected void doNextHook(Object obj2) {
                BasicFlowTask.this.resultHandler(role, basicStoryBus, serviceTask, obj2, taskServiceDef);
                AsyncFlowTask asyncFlowTask = new AsyncFlowTask(BasicFlowTask.this.getAsyncPropertyDef(), flowRegister);
                try {
                    Future<AsyncTaskState> submit = BasicFlowTask.this.getAsyncThreadPool().submit(asyncFlowTask);
                    BasicFlowTask.LOGGER.debug("submit async story task. startId: {}", asyncFlowTask.getStartEventId());
                    flowRegister.addTaskFuture(submit);
                    asyncFlowTask.openSwitch();
                    dispose();
                } catch (Throwable th) {
                    asyncFlowTask.openSwitch();
                    throw th;
                }
            }

            @Override // cn.kstry.framework.core.engine.BasicFlowTask.FlowTaskSubscriber
            protected void doErrorHook(Throwable th) {
                flowRegister.getAsyncTaskCell().errorNotice(th);
                flowRegister.getAsyncTaskCell().cancel();
            }

            @Override // cn.kstry.framework.core.engine.BasicFlowTask.FlowTaskSubscriber
            protected void doCompleteHook() {
                BasicFlowTask.this.resultHandler(role, basicStoryBus, serviceTask, null, taskServiceDef);
                AsyncFlowTask asyncFlowTask = new AsyncFlowTask(BasicFlowTask.this.getAsyncPropertyDef(), flowRegister);
                try {
                    flowRegister.addTaskFuture(BasicFlowTask.this.getAsyncThreadPool().submit(asyncFlowTask));
                } finally {
                    asyncFlowTask.openSwitch();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resultHandler(Role role, BasicStoryBus basicStoryBus, ServiceTask serviceTask, Object obj, TaskServiceDef taskServiceDef) {
        basicStoryBus.noticeResult(serviceTask, obj, taskServiceDef);
        customRoleInfo(role, basicStoryBus, serviceTask);
    }

    private void customRoleInfo(Role role, BasicStoryBus basicStoryBus, ServiceTask serviceTask) {
        CustomRoleInfo customRoleInfo = serviceTask.getCustomRoleInfo();
        if (customRoleInfo == null) {
            return;
        }
        Optional<TaskServiceDef> taskServiceDef = getTaskContainer().getTaskServiceDef(customRoleInfo.getTaskComponentName(), customRoleInfo.getTaskServiceName(), role);
        if (taskServiceDef.isPresent() || !serviceTask.allowAbsent()) {
            doInvokeMethod(serviceTask, taskServiceDef.orElseThrow(() -> {
                return KstryException.buildException(ExceptionEnum.TASK_SERVICE_MATCH_ERROR, ExceptionEnum.TASK_SERVICE_MATCH_ERROR.getDesc() + GlobalUtil.format(" service task id: {}, name: {}", serviceTask.getId(), serviceTask.getName()));
            }), basicStoryBus, role);
        }
    }

    private void submitAsyncTask(Role role, BasicStoryBus basicStoryBus, FlowRegister flowRegister, FlowElement flowElement) {
        if (!$assertionsDisabled && !(flowElement instanceof FlowElementHook)) {
            throw new AssertionError();
        }
        FlowElementHook flowElementHook = (FlowElementHook) flowElement;
        flowElementHook.hook(list -> {
            AsyncTaskForkJoin asyncTaskForkJoin = new AsyncTaskForkJoin();
            asyncTaskForkJoin.setRole(role);
            asyncTaskForkJoin.setStoryBus(basicStoryBus);
            asyncTaskForkJoin.setTaskContainer(getTaskContainer());
            asyncTaskForkJoin.setParamInitStrategy(getParamInitStrategy());
            asyncTaskForkJoin.setAsyncThreadPool(getAsyncThreadPool());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                asyncTaskForkJoin.submitTask(flowRegister.asyncFlowRegister((SequenceFlow) ((FlowElement) it.next())));
            }
        });
        flowElementHook.trigger();
    }

    private Object doInvokeMethod(ServiceTask serviceTask, TaskServiceDef taskServiceDef, BasicStoryBus basicStoryBus, Role role) {
        MethodWrapper methodWrapper = taskServiceDef.getMethodWrapper();
        TaskComponentRegisterProxy taskComponentTarget = taskServiceDef.getTaskComponentTarget();
        AssertUtil.notNull(methodWrapper.getMethod());
        AssertUtil.notNull(taskComponentTarget.getTarget());
        List<MethodWrapper.ParamInjectDef> paramInjectDefs = methodWrapper.getParamInjectDefs();
        return CollectionUtils.isEmpty(paramInjectDefs) ? ProxyUtil.invokeMethod(basicStoryBus, methodWrapper, taskComponentTarget.getTarget()) : ProxyUtil.invokeMethod(basicStoryBus, methodWrapper, taskComponentTarget.getTarget(), () -> {
            return TaskServiceUtil.getTaskParams(serviceTask, basicStoryBus, role, taskComponentTarget, paramInjectDefs, getParamInitStrategy());
        });
    }

    static {
        $assertionsDisabled = !BasicFlowTask.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(BasicFlowTask.class);
    }
}
