package cn.zhxu.toys.concurrent;

import cn.zhxu.toys.concurrent.ParallelScheduler;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/zhxu/toys/concurrent/ParallelTaskScheduler.class */
public class ParallelTaskScheduler implements ParallelScheduler, ExecutorService {
    private static final Logger log = LoggerFactory.getLogger(ParallelTaskScheduler.class);
    private int batchSize;
    private final ThreadPoolExecutor exePool;
    private final AtomicInteger totalTasks;
    private boolean verbose;

    public ParallelTaskScheduler() {
        this(100);
    }

    public ParallelTaskScheduler(int i) {
        this(new ScheduledThreadPoolExecutor(i));
    }

    public ParallelTaskScheduler(ThreadPoolExecutor threadPoolExecutor) {
        this.batchSize = 20;
        this.exePool = threadPoolExecutor;
        this.totalTasks = new AtomicInteger(0);
    }

    @Override // cn.zhxu.toys.concurrent.ParallelScheduler
    public <T> void schedule(int i, ParallelScheduler.TaskProvider<T> taskProvider, ParallelScheduler.TaskExecutor<T> taskExecutor) {
        schedule(i, taskProvider, taskExecutor, null);
    }

    @Override // cn.zhxu.toys.concurrent.ParallelScheduler
    public <T> void schedule(int i, ParallelScheduler.TaskProvider<T> taskProvider, ParallelScheduler.TaskExecutor<T> taskExecutor, ParallelScheduler.Identify<T> identify) {
        int i2 = 0;
        LinkedList linkedList = new LinkedList();
        List<T> taskList = taskProvider.getTaskList(0, this.batchSize);
        int size = taskList.size();
        long j = 0;
        while (size > 0) {
            Stream<T> stream = taskList.stream();
            if (j > 0) {
                long j2 = j;
                stream = stream.filter(obj -> {
                    return identify.id(obj) > j2;
                });
            }
            Queue<T> queue = toQueue(stream);
            if (queue.size() > 0) {
                if (identify != null) {
                    j = identify.id(queue.stream().max((obj2, obj3) -> {
                        return (int) (identify.id(obj2) - identify.id(obj3));
                    }).get());
                }
                submitTasks(i, queue, taskExecutor, linkedList);
            }
            if (size < this.batchSize) {
                break;
            }
            i2++;
            taskList = taskProvider.getTaskList(i2, this.batchSize);
            size = taskList.size();
        }
        waitDone(linkedList);
    }

    @Override // cn.zhxu.toys.concurrent.ParallelScheduler
    public <T> Future<?> asyncSchedule(int i, ParallelScheduler.TaskProvider<T> taskProvider, ParallelScheduler.TaskExecutor<T> taskExecutor) {
        return asyncSchedule(i, taskProvider, taskExecutor, null);
    }

    @Override // cn.zhxu.toys.concurrent.ParallelScheduler
    public <T> Future<?> asyncSchedule(int i, ParallelScheduler.TaskProvider<T> taskProvider, ParallelScheduler.TaskExecutor<T> taskExecutor, ParallelScheduler.Identify<T> identify) {
        return this.exePool.submit(() -> {
            schedule(i, taskProvider, taskExecutor, identify);
        });
    }

    @Override // java.util.concurrent.Executor
    public void execute(Runnable runnable) {
        this.exePool.execute(runnable);
    }

    @Override // java.util.concurrent.ExecutorService
    public void shutdown() {
        this.exePool.shutdown();
    }

    @Override // java.util.concurrent.ExecutorService
    public List<Runnable> shutdownNow() {
        return this.exePool.shutdownNow();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isShutdown() {
        return this.exePool.isShutdown();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isTerminated() {
        return this.exePool.isTerminated();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.exePool.awaitTermination(j, timeUnit);
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> Future<T> submit(Callable<T> callable) {
        return this.exePool.submit(callable);
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> Future<T> submit(Runnable runnable, T t) {
        return this.exePool.submit(runnable, t);
    }

    @Override // java.util.concurrent.ExecutorService
    public Future<?> submit(Runnable runnable) {
        return this.exePool.submit(runnable);
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection) throws InterruptedException {
        return this.exePool.invokeAll(collection);
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection, long j, TimeUnit timeUnit) throws InterruptedException {
        return this.exePool.invokeAll(collection, j, timeUnit);
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> T invokeAny(Collection<? extends Callable<T>> collection) throws InterruptedException, ExecutionException {
        return (T) this.exePool.invokeAny(collection);
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> T invokeAny(Collection<? extends Callable<T>> collection, long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
        return (T) this.exePool.invokeAny(collection, j, timeUnit);
    }

    private <T> Queue<T> toQueue(Stream<T> stream) {
        final LinkedList linkedList = new LinkedList();
        stream.forEach(new Consumer<T>() { // from class: cn.zhxu.toys.concurrent.ParallelTaskScheduler.1
            @Override // java.util.function.Consumer
            public void accept(T t) {
                linkedList.add(t);
            }
        });
        return linkedList;
    }

    protected <T> void submitTasks(int i, Queue<T> queue, final ParallelScheduler.TaskExecutor<T> taskExecutor, List<Future<?>> list) {
        while (queue.size() > 0) {
            while (list.size() < i && queue.size() > 0) {
                final T poll = queue.poll();
                list.add(this.exePool.submit(new Runnable() { // from class: cn.zhxu.toys.concurrent.ParallelTaskScheduler.2
                    @Override // java.lang.Runnable
                    public void run() {
                        if (ParallelTaskScheduler.this.verbose) {
                            ParallelTaskScheduler.log.info("平行机：" + ParallelTaskScheduler.this.totalTasks.incrementAndGet() + ", " + ParallelTaskScheduler.this.exePool.getActiveCount());
                        }
                        taskExecutor.execute(poll);
                        if (ParallelTaskScheduler.this.verbose) {
                            ParallelTaskScheduler.log.info("平行机：" + ParallelTaskScheduler.this.totalTasks.decrementAndGet() + ", " + ParallelTaskScheduler.this.exePool.getActiveCount());
                        }
                    }
                }));
            }
            if (queue.size() > 0) {
                try {
                    Thread.sleep(1L);
                    for (int size = list.size() - 1; size >= 0; size--) {
                        if (list.get(size).isDone()) {
                            list.remove(size);
                        }
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        }
    }

    private void waitDone(List<Future<?>> list) {
        list.forEach(new Consumer<Future<?>>() { // from class: cn.zhxu.toys.concurrent.ParallelTaskScheduler.3
            @Override // java.util.function.Consumer
            public void accept(Future<?> future) {
                try {
                    future.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        });
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public void setVerbose(boolean z) {
        this.verbose = z;
    }
}
