package cn.ponfee.disjob.common.concurrent;

import cn.ponfee.disjob.common.exception.Throwables;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/ponfee/disjob/common/concurrent/AsyncDelayedExecutor.class */
public final class AsyncDelayedExecutor<E> extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncDelayedExecutor.class);
    private final Consumer<E> processor;
    private final ThreadPoolExecutor asyncExecutor;
    private final DelayQueue<DelayedData<E>> queue;
    private final AtomicBoolean stopped;

    public AsyncDelayedExecutor(Consumer<E> consumer) {
        this(1, consumer);
    }

    public AsyncDelayedExecutor(int i, Consumer<E> consumer) {
        this.queue = new DelayQueue<>();
        this.stopped = new AtomicBoolean(false);
        this.processor = consumer;
        this.asyncExecutor = i > 1 ? ThreadPoolExecutors.builder().corePoolSize(1).maximumPoolSize(i).workQueue(new SynchronousQueue()).keepAliveTimeSeconds(300L).threadFactory(NamedThreadFactory.builder().prefix("async_delayed_worker").build()).rejectedHandler(ThreadPoolExecutors.CALLER_RUNS).build() : null;
        super.setName("async_delayed_executor-" + Integer.toHexString(hashCode()));
        super.setDaemon(false);
        super.setUncaughtExceptionHandler(LoggedUncaughtExceptionHandler.INSTANCE);
        super.start();
    }

    public boolean put(DelayedData<E> delayedData) {
        if (this.stopped.get()) {
            return false;
        }
        return this.queue.offer((DelayQueue<DelayedData<E>>) delayedData);
    }

    public boolean toStop() {
        return this.stopped.compareAndSet(false, true);
    }

    public void doStop() {
        toStop();
        Threads.stopThread(this, 1000L);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            if (this.stopped.get()) {
                break;
            }
            if (super.isInterrupted()) {
                LOG.error("Async delayed thread interrupted.");
                break;
            }
            try {
                DelayedData<E> poll = this.queue.poll(3000L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    E data = poll.getData();
                    if (this.asyncExecutor != null) {
                        this.asyncExecutor.submit(Throwables.ThrowingRunnable.caught(() -> {
                            this.processor.accept(data);
                        }));
                    } else {
                        Throwables.ThrowingRunnable.execute(() -> {
                            this.processor.accept(data);
                        });
                    }
                }
            } catch (InterruptedException e) {
                LOG.error("Delayed queue pool interrupted.", e);
                toStop();
                Thread.currentThread().interrupt();
            }
        }
        if (this.asyncExecutor != null) {
            ThreadPoolExecutors.shutdown(this.asyncExecutor, 1);
        }
    }
}
