package cn.kstry.framework.core.engine.thread;

import cn.kstry.framework.core.constant.GlobalProperties;
import cn.kstry.framework.core.container.ComponentLifecycle;
import cn.kstry.framework.core.engine.FlowRegister;
import cn.kstry.framework.core.engine.future.AdminFuture;
import cn.kstry.framework.core.engine.future.AdminTaskFuture;
import cn.kstry.framework.core.engine.future.FragmentFuture;
import cn.kstry.framework.core.engine.future.InvokeFuture;
import cn.kstry.framework.core.engine.future.MonoFlowFuture;
import cn.kstry.framework.core.enums.ExecutorType;
import cn.kstry.framework.core.exception.ExceptionEnum;
import cn.kstry.framework.core.exception.KstryException;
import cn.kstry.framework.core.util.AssertUtil;
import cn.kstry.framework.core.util.GlobalUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:cn/kstry/framework/core/engine/thread/TaskServiceExecutor.class */
public class TaskServiceExecutor implements TaskExecutor, ComponentLifecycle {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceExecutor.class);
    private static final boolean SUPPORT_VIRTUAL_THREAD;
    private final ExecutorType executorType;
    private final ExecutorService executorService;
    private final String prefix;

    public TaskServiceExecutor(ExecutorType executorType, ExecutorService executorService, String str) {
        AssertUtil.anyNotNull(executorType, executorService);
        this.executorService = executorService;
        this.executorType = executorType;
        this.prefix = str == null ? "" : str;
    }

    public TaskServiceExecutor(ExecutorType executorType, int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler, String str) {
        this(executorType, new ThreadPoolExecutor(i, i2, j, timeUnit, blockingQueue, threadFactory, rejectedExecutionHandler), str);
    }

    public static TaskServiceExecutor buildDefaultExecutor(ExecutorType executorType) {
        AssertUtil.notNull(executorType);
        if (GlobalProperties.KSTRY_OPEN_VIRTUAL_THREAD && SUPPORT_VIRTUAL_THREAD) {
            String format = GlobalUtil.format("kstry-{}-virtual-thread", executorType.name().toLowerCase());
            try {
                return getVirtualTaskServiceExecutor(executorType, format);
            } catch (Throwable th) {
                LOGGER.warn("TaskServiceExecutor getVirtualTaskServiceExecutor error. use ThreadPoolExecutor. name: {}", format, th);
            }
        }
        String format2 = GlobalUtil.format("kstry-{}-thread-pool", executorType.name().toLowerCase());
        return buildTaskExecutor(executorType, format2, GlobalProperties.THREAD_POOL_CORE_SIZE, GlobalProperties.THREAD_POOL_MAX_SIZE, GlobalProperties.THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue(GlobalProperties.KSTRY_THREAD_POOL_QUEUE_SIZE), new ThreadFactoryBuilder().setNameFormat(format2 + "-%d").build(), (runnable, threadPoolExecutor) -> {
            KstryException kstryException = new KstryException(ExceptionEnum.ASYNC_QUEUE_OVERFLOW);
            String name = runnable.getClass().getName();
            if (runnable instanceof Task) {
                name = ((Task) GlobalUtil.transferNotEmpty(runnable, Task.class)).getTaskName();
            }
            LOGGER.error(kstryException.getMessage() + " taskName: {}", name);
        });
    }

    public static TaskServiceExecutor buildDefaultExecutor(ExecutorType executorType, ExecutorService executorService, String str) {
        return new TaskServiceExecutor(executorType, executorService, str);
    }

    public static TaskServiceExecutor buildTaskExecutor(ExecutorType executorType, String str, int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        return new TaskServiceExecutor(executorType, i, i2, j, timeUnit, blockingQueue, threadFactory, rejectedExecutionHandler, str);
    }

    @Override // cn.kstry.framework.core.engine.thread.TaskExecutor
    public AdminFuture submitAdminTask(ExecutorService executorService, MainFlowTask mainFlowTask) {
        AssertUtil.notNull(mainFlowTask);
        try {
            AdminTaskFuture adminTaskFuture = new AdminTaskFuture(mainFlowTask.buildTaskFuture(getActualExecutor(executorService).submit(mainFlowTask)));
            mainFlowTask.setAdminFuture(adminTaskFuture);
            mainFlowTask.openSwitch();
            return adminTaskFuture;
        } catch (Throwable th) {
            mainFlowTask.openSwitch();
            throw th;
        }
    }

    @Override // cn.kstry.framework.core.engine.thread.TaskExecutor
    public void submitFragmentTask(ExecutorService executorService, FragmentTask fragmentTask) {
        AssertUtil.notNull(fragmentTask);
        try {
            FragmentFuture buildTaskFuture = fragmentTask.buildTaskFuture(getActualExecutor(executorService).submit(fragmentTask));
            FlowRegister flowRegister = fragmentTask.getFlowRegister();
            flowRegister.getAdminFuture().addManagedFuture(buildTaskFuture, flowRegister.getStartEventId());
            fragmentTask.openSwitch();
        } catch (Throwable th) {
            fragmentTask.openSwitch();
            throw th;
        }
    }

    @Override // cn.kstry.framework.core.engine.thread.TaskExecutor
    public void submitMonoFlowTask(ExecutorService executorService, String str, MonoFlowTask monoFlowTask) {
        AssertUtil.notNull(monoFlowTask);
        try {
            MonoFlowFuture buildTaskFuture = monoFlowTask.buildTaskFuture(getActualExecutor(executorService).submit(monoFlowTask));
            FlowRegister flowRegister = monoFlowTask.getFlowRegister();
            flowRegister.getAdminFuture().addManagedFuture(str, buildTaskFuture, flowRegister.getStartEventId());
            monoFlowTask.openSwitch();
        } catch (Throwable th) {
            monoFlowTask.openSwitch();
            throw th;
        }
    }

    @Override // cn.kstry.framework.core.engine.thread.TaskExecutor
    public InvokeFuture submitMethodInvokeTask(ExecutorService executorService, MethodInvokeTask methodInvokeTask) {
        AssertUtil.notNull(methodInvokeTask);
        try {
            InvokeFuture buildTaskFuture = methodInvokeTask.buildTaskFuture(getActualExecutor(executorService).submit(methodInvokeTask));
            FlowRegister flowRegister = methodInvokeTask.getFlowRegister();
            flowRegister.getAdminFuture().addManagedFuture(buildTaskFuture, flowRegister.getStartEventId());
            methodInvokeTask.openSwitch();
            return buildTaskFuture;
        } catch (Throwable th) {
            methodInvokeTask.openSwitch();
            throw th;
        }
    }

    @Override // cn.kstry.framework.core.container.ComponentLifecycle
    public void destroy() {
        if (!(this.executorService instanceof ThreadPoolExecutor)) {
            LOGGER.info("Begin shutdown time slot thread pool! name: {}", this.prefix);
            this.executorService.shutdown();
            try {
                TimeUnit.MILLISECONDS.sleep(GlobalProperties.ENGINE_SHUTDOWN_SLEEP_SECONDS);
            } catch (Throwable th) {
                LOGGER.warn("Time slot thread pool close task are interrupted on shutdown! name: {}", this.prefix, th);
            }
            if (this.executorService.isShutdown()) {
                LOGGER.info("[shutdown] interrupting tasks in the thread pool success! thread pool close success! name: {}", this.prefix);
                return;
            }
            LOGGER.info("Interrupting tasks in the thread pool that have not yet finished! begin shutdownNow! name: {}", this.prefix);
            this.executorService.shutdownNow();
            try {
                TimeUnit.MILLISECONDS.sleep(GlobalProperties.ENGINE_SHUTDOWN_NOW_SLEEP_SECONDS);
            } catch (Throwable th2) {
                LOGGER.warn("time slot thread pool close task are interrupted on shutdown! name: {}", this.prefix, th2);
            }
            if (this.executorService.isShutdown()) {
                LOGGER.info("[shutdownNow] interrupting tasks in the thread pool success! thread pool close success! name: {}", this.prefix);
                return;
            } else {
                LOGGER.error("[shutdownNow] interrupting tasks in the thread pool error! thread pool close error! name: {}", this.prefix);
                return;
            }
        }
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.executorService;
        LOGGER.info("Begin shutdown time slot thread pool! name: {}, active count: {}", this.prefix, Integer.valueOf(threadPoolExecutor.getActiveCount()));
        threadPoolExecutor.shutdown();
        try {
            TimeUnit.MILLISECONDS.sleep(GlobalProperties.ENGINE_SHUTDOWN_SLEEP_SECONDS);
        } catch (Throwable th3) {
            LOGGER.warn("Time slot thread pool close task are interrupted on shutdown! name: {}", this.prefix, th3);
        }
        if (threadPoolExecutor.isShutdown() && threadPoolExecutor.getActiveCount() == 0) {
            LOGGER.info("[shutdown] interrupting tasks in the thread pool success! thread pool close success!, name: {}", this.prefix);
            return;
        }
        LOGGER.info("Interrupting tasks in the thread pool that have not yet finished! begin shutdownNow! name: {}, active count: {}", this.prefix, Integer.valueOf(threadPoolExecutor.getActiveCount()));
        threadPoolExecutor.shutdownNow();
        try {
            TimeUnit.MILLISECONDS.sleep(GlobalProperties.ENGINE_SHUTDOWN_NOW_SLEEP_SECONDS);
        } catch (Throwable th4) {
            LOGGER.warn("time slot thread pool close task are interrupted on shutdown! name: {}", this.prefix, th4);
        }
        if (threadPoolExecutor.isShutdown() && threadPoolExecutor.getActiveCount() == 0) {
            LOGGER.info("[shutdownNow] interrupting tasks in the thread pool success! thread pool close success! name: {}", this.prefix);
        } else {
            LOGGER.error("[shutdownNow] interrupting tasks in the thread pool error! thread pool close error! name: {}", this.prefix);
        }
    }

    @Override // cn.kstry.framework.core.engine.thread.TaskExecutor
    public ExecutorType getExecutorType() {
        return this.executorType;
    }

    @Override // cn.kstry.framework.core.engine.thread.TaskExecutor
    public String getPrefix() {
        return this.prefix;
    }

    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    private ExecutorService getActualExecutor(ExecutorService executorService) {
        return executorService == null ? this.executorService : executorService;
    }

    private static TaskServiceExecutor getVirtualTaskServiceExecutor(ExecutorType executorType, String str) throws Exception {
        return buildDefaultExecutor(executorType, (ExecutorService) MethodUtils.invokeStaticMethod(Executors.class, "newThreadPerTaskExecutor", new Object[]{(ThreadFactory) MethodUtils.invokeMethod(MethodUtils.invokeMethod(MethodUtils.invokeMethod(MethodUtils.invokeStaticMethod(Thread.class, "ofVirtual", new Object[0]), "name", new Object[]{str + "-", 0}), "uncaughtExceptionHandler", new Object[]{(thread, th) -> {
            LOGGER.info("[{}] thread uncaught exception. name: {}", new Object[]{ExceptionEnum.ASYNC_TASK_ERROR, thread.getName(), th});
        }}), "factory")}), str);
    }

    static {
        SUPPORT_VIRTUAL_THREAD = ClassUtils.isPresent("java.lang.VirtualThread", TaskServiceExecutor.class.getClassLoader()) && ClassUtils.isPresent("java.util.concurrent.ThreadPerTaskExecutor", TaskServiceExecutor.class.getClassLoader());
    }
}
