package code.ponfee.commons.concurrent;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:code/ponfee/commons/concurrent/AsyncBatchProcessor.class */
public final class AsyncBatchProcessor<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchProcessor.class);
    private final AsyncBatchThread<T> async;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:code/ponfee/commons/concurrent/AsyncBatchProcessor$AsyncBatchThread.class */
    public static class AsyncBatchThread<T> extends Thread {
        private static final int MINIMUM_PERIOD_TIME_MILLIS = 9;
        private final LinkedBlockingQueue<T> queue;
        private final AtomicBoolean stopped;
        private final BatchProcessor<T> processor;
        private final int periodTimeMillis;
        private final int sleepTimeMillis;
        private final int batchSize;
        private final int asyncExecuteThreshold;
        private final int maximumPoolSize;
        private long nextRefreshTimeMillis;

        private AsyncBatchThread(BatchProcessor<T> batchProcessor, int i, int i2, int i3) {
            this.queue = new LinkedBlockingQueue<>();
            this.stopped = new AtomicBoolean(false);
            this.nextRefreshTimeMillis = 0L;
            Assert.isTrue(i >= MINIMUM_PERIOD_TIME_MILLIS, "Period time millis must greater than 9, but actual " + i);
            Assert.isTrue(i2 > 0, "Batch size cannot negative number.");
            Assert.isTrue(i3 > 0, "Maximum pool size cannot negative number.");
            this.processor = batchProcessor;
            this.periodTimeMillis = i;
            this.sleepTimeMillis = i >>> 1;
            this.batchSize = i2;
            this.asyncExecuteThreshold = i2 + (i2 >>> 1);
            this.maximumPoolSize = i3;
            super.setName("async-batch-processor-thread-" + Integer.toHexString(hashCode()));
            super.setDaemon(false);
            super.start();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            ThreadPoolExecutor threadPoolExecutor = null;
            ArrayList arrayList = new ArrayList(this.batchSize);
            int i = this.batchSize;
            while (true) {
                if (isEnd()) {
                    if (threadPoolExecutor != null) {
                        try {
                            Thread.sleep(this.periodTimeMillis);
                        } catch (InterruptedException e) {
                            AsyncBatchProcessor.LOG.error("Thread#sleep occur error.", e);
                            Thread.currentThread().interrupt();
                        }
                    }
                    if (isEnd()) {
                        break;
                    }
                }
                if (!this.queue.isEmpty() && i > 0) {
                    i -= this.queue.drainTo(arrayList, i);
                }
                long currentTimeMillis = System.currentTimeMillis();
                if (i == 0 || (!arrayList.isEmpty() && (this.stopped.get() || currentTimeMillis >= this.nextRefreshTimeMillis))) {
                    if (threadPoolExecutor == null && i == 0 && this.queue.size() > this.asyncExecuteThreshold) {
                        threadPoolExecutor = ThreadPoolExecutors.create(1, this.maximumPoolSize, 300L, 2, "async-batch-processor-worker", ThreadPoolExecutors.ALWAYS_CALLER_RUNS);
                        AsyncBatchProcessor.LOG.info("Asnyc batch processor created thread pool executor: {}", new ThreadPoolMonitor(threadPoolExecutor));
                    }
                    if (threadPoolExecutor != null) {
                        ArrayList arrayList2 = arrayList;
                        threadPoolExecutor.submit(() -> {
                            this.processor.process(arrayList2, isEnd());
                        });
                        int i2 = this.batchSize;
                        i = i2;
                        arrayList = new ArrayList(i2);
                    } else {
                        this.processor.process(arrayList, isEnd());
                        arrayList.clear();
                        i = this.batchSize;
                    }
                    this.nextRefreshTimeMillis = currentTimeMillis + this.periodTimeMillis;
                } else if (this.stopped.get()) {
                    continue;
                } else {
                    try {
                        Thread.sleep(this.sleepTimeMillis);
                    } catch (InterruptedException e2) {
                        AsyncBatchProcessor.LOG.error("Thread#sleep occur error.", e2);
                        this.stopped.compareAndSet(false, true);
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
            if (threadPoolExecutor != null) {
                ThreadPoolExecutors.shutdown(threadPoolExecutor);
            }
        }

        private boolean isEnd() {
            return this.stopped.get() && this.queue.isEmpty();
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:code/ponfee/commons/concurrent/AsyncBatchProcessor$BatchProcessor.class */
    public interface BatchProcessor<T> {
        void process(List<T> list, boolean z);
    }

    public AsyncBatchProcessor(BatchProcessor<T> batchProcessor) {
        this(batchProcessor, 100, 200, 2);
    }

    public AsyncBatchProcessor(BatchProcessor<T> batchProcessor, int i, int i2, int i3) {
        this.async = new AsyncBatchThread<>(batchProcessor, i, i2, i3);
    }

    public boolean put(T t) {
        return !((AsyncBatchThread) this.async).stopped.get() && ((AsyncBatchThread) this.async).queue.offer(t);
    }

    public boolean put(T[] tArr) {
        if (((AsyncBatchThread) this.async).stopped.get() || tArr == null || tArr.length == 0) {
            return false;
        }
        for (T t : tArr) {
            if (!((AsyncBatchThread) this.async).queue.offer(t)) {
                return false;
            }
        }
        return true;
    }

    public boolean put(List<T> list) {
        if (((AsyncBatchThread) this.async).stopped.get() || list == null || list.isEmpty()) {
            return false;
        }
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            if (!((AsyncBatchThread) this.async).queue.offer(it.next())) {
                return false;
            }
        }
        return true;
    }

    public boolean stop() {
        return ((AsyncBatchThread) this.async).stopped.compareAndSet(false, true);
    }

    public void stopAndAwait() throws InterruptedException {
        stop();
        while (!Threads.isStopped(this.async)) {
            Thread.sleep(((AsyncBatchThread) this.async).periodTimeMillis);
        }
    }
}
