package cn.kstry.framework.core.engine;

import cn.kstry.framework.core.bus.BasicStoryBus;
import cn.kstry.framework.core.bus.StoryBus;
import cn.kstry.framework.core.constant.GlobalProperties;
import cn.kstry.framework.core.container.MethodWrapper;
import cn.kstry.framework.core.container.StartEventContainer;
import cn.kstry.framework.core.container.TaskContainer;
import cn.kstry.framework.core.engine.facade.StoryRequest;
import cn.kstry.framework.core.engine.facade.TaskResponse;
import cn.kstry.framework.core.engine.facade.TaskResponseBox;
import cn.kstry.framework.core.enums.AsyncTaskState;
import cn.kstry.framework.core.exception.ExceptionEnum;
import cn.kstry.framework.core.exception.KstryException;
import cn.kstry.framework.core.monitor.RecallStory;
import cn.kstry.framework.core.role.BusinessRoleRepository;
import cn.kstry.framework.core.role.Role;
import cn.kstry.framework.core.util.AssertUtil;
import cn.kstry.framework.core.util.ElementParserUtil;
import cn.kstry.framework.core.util.GlobalUtil;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import reactor.core.publisher.Mono;

/* loaded from: input_file:cn/kstry/framework/core/engine/StoryEngine.class */
public class StoryEngine {
    private static final Logger LOGGER = LoggerFactory.getLogger(StoryEngine.class);
    private final StartEventContainer startEventContainer;
    private final ThreadPoolExecutor asyncThreadPool;
    private final TaskContainer taskContainer;
    private final BusinessRoleRepository businessRoleRepository;
    private final Function<MethodWrapper.ParamInjectDef, Object> paramInitStrategy;

    public StoryEngine(BusinessRoleRepository businessRoleRepository, StartEventContainer startEventContainer, ThreadPoolExecutor threadPoolExecutor, TaskContainer taskContainer, Function<MethodWrapper.ParamInjectDef, Object> function) {
        AssertUtil.notNull(businessRoleRepository);
        AssertUtil.notNull(taskContainer);
        AssertUtil.notNull(threadPoolExecutor);
        AssertUtil.notNull(function);
        AssertUtil.notNull(startEventContainer);
        this.businessRoleRepository = businessRoleRepository;
        this.asyncThreadPool = threadPoolExecutor;
        this.taskContainer = taskContainer;
        this.paramInitStrategy = function;
        this.startEventContainer = startEventContainer;
    }

    public <T> TaskResponse<T> fire(StoryRequest<T> storyRequest) {
        try {
            try {
                MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, GlobalUtil.getOrSetRequestId(storyRequest));
                preProcessing(storyRequest);
                TaskResponse<T> doFire = doFire(storyRequest);
                MDC.clear();
                return doFire;
            } catch (KstryException e) {
                TaskResponse<T> buildError = TaskResponseBox.buildError(e.getErrorCode(), e.getMessage());
                ((TaskResponseBox) GlobalUtil.transferNotEmpty(buildError, TaskResponseBox.class)).setResultException(e);
                LOGGER.warn(e.getMessage(), e);
                MDC.clear();
                return buildError;
            } catch (Exception e2) {
                TaskResponse<T> buildError2 = TaskResponseBox.buildError(ExceptionEnum.SYSTEM_ERROR.getExceptionCode(), ExceptionEnum.SYSTEM_ERROR.getDesc());
                ((TaskResponseBox) GlobalUtil.transferNotEmpty(buildError2, TaskResponseBox.class)).setResultException(e2);
                LOGGER.warn(e2.getMessage(), e2);
                MDC.clear();
                return buildError2;
            }
        } catch (Throwable th) {
            MDC.clear();
            throw th;
        }
    }

    public <T> Mono<T> fireAsync(StoryRequest<T> storyRequest) {
        try {
            MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, GlobalUtil.getOrSetRequestId(storyRequest));
            preProcessing(storyRequest);
            Mono<T> doFireAsync = doFireAsync(storyRequest);
            MDC.clear();
            return doFireAsync;
        } catch (Throwable th) {
            MDC.clear();
            throw th;
        }
    }

    public <T> Mono<T> doFireAsync(StoryRequest<T> storyRequest) {
        FlowRegister flowRegister = getFlowRegister(storyRequest);
        AsyncTaskCell asyncTaskCell = flowRegister.getAsyncTaskCell();
        CompletableFuture<AsyncTaskState> initResultFuture = asyncTaskCell.initResultFuture();
        StoryBus submitTaskGetBus = submitTaskGetBus(storyRequest, flowRegister);
        return Mono.fromFuture(initResultFuture.whenComplete((asyncTaskState, th) -> {
            try {
                MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, flowRegister.getRequestId());
                if (th != null && !asyncTaskCell.isCancelled()) {
                    asyncTaskCell.cancel();
                    LOGGER.warn(th.getMessage(), th);
                    flowRegister.getMonitorTracking().trackingLog();
                    Optional.ofNullable(storyRequest.getRecallStoryHook()).ifPresent(consumer -> {
                        consumer.accept(new RecallStory(th, submitTaskGetBus));
                    });
                }
            } catch (Exception e) {
                LOGGER.warn(e.getMessage(), e);
            } finally {
                MDC.clear();
            }
        }).thenCompose(asyncTaskState2 -> {
            try {
                try {
                    MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, flowRegister.getRequestId());
                    Class<?> returnType = storyRequest.getReturnType();
                    Object obj = null;
                    if (returnType != null && submitTaskGetBus.getResult() != null) {
                        obj = submitTaskGetBus.getResult();
                        AssertUtil.isTrue(Boolean.valueOf(ElementParserUtil.isAssignable(returnType, obj.getClass())), ExceptionEnum.TYPE_TRANSFER_ERROR, "Engine asyncFire. result type conversion error! expect: {}, actual: {}", returnType.getName(), obj.getClass().getName());
                    }
                    Optional.ofNullable(storyRequest.getRecallStoryHook()).ifPresent(consumer -> {
                        consumer.accept(new RecallStory(submitTaskGetBus));
                    });
                    CompletableFuture completedFuture = CompletableFuture.completedFuture(obj);
                    flowRegister.getMonitorTracking().trackingLog();
                    MDC.clear();
                    return completedFuture;
                } catch (Exception e) {
                    LOGGER.warn(e.getMessage(), e);
                    Optional.ofNullable(storyRequest.getRecallStoryHook()).ifPresent(consumer2 -> {
                        consumer2.accept(new RecallStory(e, submitTaskGetBus));
                    });
                    CompletableFuture completableFuture = new CompletableFuture();
                    completableFuture.completeExceptionally(e);
                    flowRegister.getMonitorTracking().trackingLog();
                    MDC.clear();
                    return completableFuture;
                }
            } catch (Throwable th2) {
                flowRegister.getMonitorTracking().trackingLog();
                MDC.clear();
                throw th2;
            }
        })).timeout(Duration.ofMillis(((Integer) Optional.ofNullable(storyRequest.getTimeout()).orElse(Integer.valueOf(GlobalProperties.ASYNC_TASK_DEFAULT_TIMEOUT))).intValue()), Mono.fromSupplier(() -> {
            try {
                try {
                    MDC.put(GlobalProperties.KSTRY_STORY_REQUEST_ID_NAME, flowRegister.getRequestId());
                    asyncTaskCell.cancel();
                    flowRegister.getMonitorTracking().trackingLog();
                    Object orElse = Optional.ofNullable(storyRequest.getMonoTimeoutFallback()).map((v0) -> {
                        return v0.get();
                    }).orElse(null);
                    Optional.ofNullable(storyRequest.getRecallStoryHook()).ifPresent(consumer -> {
                        consumer.accept(new RecallStory(submitTaskGetBus));
                    });
                    MDC.clear();
                    return orElse;
                } catch (Exception e) {
                    LOGGER.warn(e.getMessage(), e);
                    throw e;
                }
            } catch (Throwable th2) {
                MDC.clear();
                throw th2;
            }
        }));
    }

    private <T> TaskResponse<T> doFire(StoryRequest<T> storyRequest) {
        FlowRegister flowRegister = getFlowRegister(storyRequest);
        StoryBus submitTaskGetBus = submitTaskGetBus(storyRequest, flowRegister);
        try {
            try {
                Optional<KstryException> optional = flowRegister.getAsyncTaskCell().get(((Integer) Optional.ofNullable(storyRequest.getTimeout()).orElse(Integer.valueOf(GlobalProperties.ASYNC_TASK_DEFAULT_TIMEOUT))).intValue(), TimeUnit.MILLISECONDS);
                if (optional.isPresent()) {
                    throw optional.get();
                }
                Object obj = null;
                Class<?> returnType = storyRequest.getReturnType();
                if (returnType != null && submitTaskGetBus.getResult() != null) {
                    obj = submitTaskGetBus.getResult();
                    AssertUtil.isTrue(Boolean.valueOf(ElementParserUtil.isAssignable(returnType, obj.getClass())), ExceptionEnum.TYPE_TRANSFER_ERROR, "Engine fire. result type conversion error! expect: {}, actual: {}", returnType.getName(), obj.getClass().getName());
                }
                Optional.ofNullable(storyRequest.getRecallStoryHook()).ifPresent(consumer -> {
                    consumer.accept(new RecallStory(submitTaskGetBus));
                });
                TaskResponse<T> buildSuccess = TaskResponseBox.buildSuccess(obj);
                flowRegister.getMonitorTracking().trackingLog();
                return buildSuccess;
            } catch (Exception e) {
                Optional.ofNullable(storyRequest.getRecallStoryHook()).ifPresent(consumer2 -> {
                    consumer2.accept(new RecallStory(e, submitTaskGetBus));
                });
                throw e;
            }
        } catch (Throwable th) {
            flowRegister.getMonitorTracking().trackingLog();
            throw th;
        }
    }

    public void destroy() {
        if (this.asyncThreadPool != null) {
            LOGGER.info("begin shutdown time slot thread pool! active count: {}", Integer.valueOf(this.asyncThreadPool.getActiveCount()));
            this.asyncThreadPool.shutdown();
            try {
                TimeUnit.MILLISECONDS.sleep(GlobalProperties.ENGINE_SHUTDOWN_SLEEP_SECONDS);
            } catch (Exception e) {
                LOGGER.warn("time slot thread pool close task are interrupted on shutdown!", e);
            }
            if (this.asyncThreadPool.isShutdown() && this.asyncThreadPool.getActiveCount() == 0) {
                LOGGER.info("[shutdown] interrupting tasks in the thread pool success! thread pool close success!");
                return;
            }
            LOGGER.info("interrupting tasks in the thread pool that have not yet finished! begin shutdownNow! active count: {}", Integer.valueOf(this.asyncThreadPool.getActiveCount()));
            this.asyncThreadPool.shutdownNow();
            try {
                TimeUnit.MILLISECONDS.sleep(GlobalProperties.ENGINE_SHUTDOWN_NOW_SLEEP_SECONDS);
            } catch (Exception e2) {
                LOGGER.warn("time slot thread pool close task are interrupted on shutdown!", e2);
            }
            if (this.asyncThreadPool.isShutdown() && this.asyncThreadPool.getActiveCount() == 0) {
                LOGGER.info("[shutdownNow] interrupting tasks in the thread pool success! thread pool close success!");
            } else {
                LOGGER.error("[shutdownNow] interrupting tasks in the thread pool error! thread pool close error!");
            }
        }
    }

    private <T> StoryBus submitTaskGetBus(StoryRequest<T> storyRequest, FlowRegister flowRegister) {
        Role role = storyRequest.getRole();
        BasicStoryBus basicStoryBus = new BasicStoryBus(storyRequest.getBusinessId(), role, flowRegister.getMonitorTracking(), storyRequest.getRequest(), storyRequest.getVarScopeData(), storyRequest.getStaScopeData());
        AsyncTaskForkJoin asyncTaskForkJoin = new AsyncTaskForkJoin();
        asyncTaskForkJoin.setRole(role);
        asyncTaskForkJoin.setStoryBus(basicStoryBus);
        asyncTaskForkJoin.setTaskContainer(this.taskContainer);
        asyncTaskForkJoin.setParamInitStrategy(this.paramInitStrategy);
        asyncTaskForkJoin.setAsyncThreadPool(this.asyncThreadPool);
        asyncTaskForkJoin.submitTask(flowRegister);
        return basicStoryBus;
    }

    private <T> FlowRegister getFlowRegister(StoryRequest<T> storyRequest) {
        String startId = storyRequest.getStartId();
        AssertUtil.notBlank(startId, ExceptionEnum.PARAMS_ERROR, "StartId is not allowed to be empty!");
        return new FlowRegister(this.startEventContainer.getStartEventById(startId).orElseThrow(() -> {
            return KstryException.buildException(null, ExceptionEnum.PARAMS_ERROR, GlobalUtil.format("StartId did not match a valid StartEvent! startId: {}", startId));
        }), storyRequest);
    }

    private <T> void preProcessing(StoryRequest<T> storyRequest) {
        String startId = storyRequest.getStartId();
        Role role = storyRequest.getRole();
        if (StringUtils.isNotBlank(startId) && role == null) {
            Optional<Role> role2 = this.businessRoleRepository.getRole(storyRequest.getBusinessId(), startId);
            storyRequest.getClass();
            role2.ifPresent(storyRequest::setRole);
        }
    }
}
