package com.picoff.commons.exec;

import com.picoff.commons.functional.Handler;
import com.picoff.commons.functional.ProcedureResultHandler;
import java.util.List;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/picoff/commons/exec/BasicAsyncExecutor.class */
public class BasicAsyncExecutor implements AsyncExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(BasicAsyncExecutor.class);
    private static final Object O = new Object();
    private static final RejectedExecutionException CAUSE_REJECTED = new RejectedExecutionException();
    private static final long TICK_PARK_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
    private final ThreadFactory threadFactory;
    private final BlockingQueue<AsyncTask> tasks;
    private final Thread monitor;
    private final String poolPrefix;
    private final WeakHashMap<Thread, Object> workers = new WeakHashMap<>();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final AtomicInteger threadId = new AtomicInteger();
    private long windDownScale = TimeUnit.MILLISECONDS.toNanos(100);
    private long windDownDelay = TimeUnit.MICROSECONDS.toNanos(500);

    public BasicAsyncExecutor(int i, BlockingQueue<AsyncTask> blockingQueue, String str) {
        this.poolPrefix = str;
        this.threadFactory = runnable -> {
            AsyncReferenceThread asyncReferenceThread = new AsyncReferenceThread(runnable, this);
            asyncReferenceThread.setDaemon(true);
            asyncReferenceThread.setName(String.format(str, Integer.valueOf(this.threadId.incrementAndGet())));
            return asyncReferenceThread;
        };
        this.tasks = blockingQueue;
        if (this.workers.size() < i) {
            spawnWorkers(i);
        }
        this.monitor = new AsyncReferenceThread(() -> {
            monitorWorker(i);
        }, this);
        this.monitor.start();
        Runtime.getRuntime().addShutdownHook(new Thread(this::signalShutdown));
    }

    private void monitorWorker(int i) {
        do {
            List list = (List) this.workers.keySet().stream().filter(thread -> {
                return !thread.isAlive();
            }).collect(Collectors.toList());
            WeakHashMap<Thread, Object> weakHashMap = this.workers;
            weakHashMap.getClass();
            list.forEach((v1) -> {
                r1.remove(v1);
            });
            if (this.workers.size() < i) {
                spawnWorkers(i);
            }
            LockSupport.parkNanos(TICK_PARK_NANOS);
            if (this.shutdown.get()) {
                break;
            }
        } while (!this.monitor.isInterrupted());
        this.monitor.interrupt();
    }

    private void spawnWorkers(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            Thread newThread = this.threadFactory.newThread(this::workerWork);
            newThread.setUncaughtExceptionHandler(this::handleThreadException);
            newThread.start();
            this.workers.put(newThread, O);
        }
    }

    private void handleThreadException(Thread thread, Throwable th) {
        LOGGER.error(String.format("Uncaught exception in async executor worker thread %s", thread.getName()), th);
    }

    private void workerWork() {
        long nanoTime = System.nanoTime();
        while (true) {
            AsyncTask poll = this.tasks.poll();
            if (null == poll) {
                LockSupport.parkNanos(System.nanoTime() > nanoTime ? this.windDownDelay : 1L);
            } else {
                nanoTime = System.nanoTime() + this.windDownScale;
                try {
                    poll.run();
                } catch (Throwable th) {
                    try {
                        poll.onComplete.fail(th);
                    } catch (Throwable th2) {
                        th2.printStackTrace();
                    }
                }
                if (this.shutdown.get() || Thread.currentThread().isInterrupted()) {
                    LockSupport.parkNanos(1000L);
                    if (null == this.tasks.peek()) {
                        return;
                    }
                }
            }
        }
    }

    @Override // com.picoff.commons.exec.AsyncExecutor
    public void signalShutdown() {
        if (this.shutdown.get()) {
            return;
        }
        LOGGER.info("Gracefully shutting down executor {}", toString());
        this.shutdown.set(true);
    }

    @Override // com.picoff.commons.exec.AsyncExecutor
    public void submit(Handler<ProcedureResultHandler> handler, ProcedureResultHandler procedureResultHandler) {
        if (this.shutdown.get()) {
            procedureResultHandler.fail(CAUSE_REJECTED);
            return;
        }
        if (this.tasks.offer(new AsyncTask(handler, procedureResultHandler))) {
            return;
        }
        procedureResultHandler.fail(CAUSE_REJECTED);
    }

    public Thread getMonitor() {
        return this.monitor;
    }

    public String toString() {
        return String.format("BasicAsyncExecutor{poolPrefix='%s'}", this.poolPrefix);
    }

    static {
        CAUSE_REJECTED.setStackTrace(new StackTraceElement[0]);
    }
}
