package cn.ideabuffer.process.core.aggregator;

import cn.ideabuffer.process.core.context.Context;
import cn.ideabuffer.process.core.nodes.MergeableNode;
import cn.ideabuffer.process.core.nodes.merger.Merger;
import cn.ideabuffer.process.core.util.AggregateUtils;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/ideabuffer/process/core/aggregator/ParallelGenericAggregator.class */
public class ParallelGenericAggregator<I, O> implements GenericAggregator<I, O> {
    protected final Logger logger;
    private Executor executor;
    private Merger<I, O> merger;
    private long timeout;

    public ParallelGenericAggregator(@NotNull Executor executor, @NotNull Merger<I, O> merger) {
        this(executor, merger, 0L);
    }

    public ParallelGenericAggregator(@NotNull Executor executor, @NotNull Merger<I, O> merger, long j) {
        this(executor, merger, j, TimeUnit.MILLISECONDS);
    }

    public ParallelGenericAggregator(@NotNull Executor executor, @NotNull Merger<I, O> merger, long j, @NotNull TimeUnit timeUnit) {
        this.logger = LoggerFactory.getLogger(getClass());
        if (j < 0) {
            throw new IllegalArgumentException("timeout must >= 0");
        }
        this.executor = executor;
        this.merger = merger;
        this.timeout = timeUnit.toMillis(j);
    }

    @Override // cn.ideabuffer.process.core.aggregator.Aggregator
    @Nullable
    public O aggregate(@NotNull Context context, List<MergeableNode<I>> list) throws Exception {
        if (list == null || list.isEmpty()) {
            return null;
        }
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(list.size());
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        list.forEach(mergeableNode -> {
            linkedList.add(getFuture(context, mergeableNode, linkedBlockingQueue));
        });
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) linkedList.toArray(new CompletableFuture[0]));
        try {
            try {
                if (this.timeout > 0) {
                    allOf.get(this.timeout, TimeUnit.MILLISECONDS);
                } else {
                    allOf.get();
                }
                linkedBlockingQueue.drainTo(linkedList2);
            } catch (TimeoutException e) {
                this.logger.error("timeout!", e);
                linkedBlockingQueue.drainTo(linkedList2);
            }
            return this.merger.merge(linkedList2);
        } catch (Throwable th) {
            linkedBlockingQueue.drainTo(linkedList2);
            throw th;
        }
    }

    private CompletableFuture<?> getFuture(Context context, MergeableNode<I> mergeableNode, BlockingQueue<I> blockingQueue) {
        CompletableFuture<Void> exceptionally = CompletableFuture.supplyAsync(() -> {
            return AggregateUtils.process(context, mergeableNode);
        }, this.executor).thenAccept(obj -> {
            if (obj != null) {
                blockingQueue.offer(obj);
            }
        }).exceptionally(th -> {
            this.logger.error("process error! node:{}", mergeableNode, th);
            return null;
        });
        return mergeableNode.getTimeout() <= 0 ? exceptionally : AggregateUtils.within(exceptionally, mergeableNode.getTimeout(), TimeUnit.MILLISECONDS);
    }
}
