package cn.ideabuffer.process.core.aggregators;

import cn.ideabuffer.process.core.context.Context;
import cn.ideabuffer.process.core.nodes.DistributeMergeableNode;
import cn.ideabuffer.process.core.utils.AggregateUtils;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/ideabuffer/process/core/aggregators/ParallelDistributeAggregator.class */
public class ParallelDistributeAggregator<R> extends AbstractAggregator implements DistributeAggregator<R> {
    private final Logger logger;
    private Executor executor;
    private Class<R> resultClass;
    private long timeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cn/ideabuffer/process/core/aggregators/ParallelDistributeAggregator$MergerNode.class */
    public static class MergerNode<V, R> {
        private DistributeMergeableNode<V, R> node;
        private V value;
        private R result;

        MergerNode(DistributeMergeableNode<V, R> distributeMergeableNode, V v, R r) {
            this.node = distributeMergeableNode;
            this.value = v;
            this.result = r;
        }

        public void merge() {
            if (this.node.getProcessor() != null) {
                this.node.getProcessor().merge(this.value, this.result);
            }
        }
    }

    public ParallelDistributeAggregator(@NotNull Executor executor, @NotNull Class<R> cls) {
        this(executor, cls, 0L);
    }

    public ParallelDistributeAggregator(@NotNull Executor executor, @NotNull Class<R> cls, long j) {
        this(executor, cls, j, TimeUnit.MILLISECONDS);
    }

    public ParallelDistributeAggregator(@NotNull Executor executor, @NotNull Class<R> cls, long j, @NotNull TimeUnit timeUnit) {
        this.logger = LoggerFactory.getLogger(getClass());
        if (j < 0) {
            throw new IllegalArgumentException("timeout must >= 0");
        }
        this.executor = executor;
        this.resultClass = cls;
        this.timeout = timeUnit.toMillis(j);
    }

    @Override // cn.ideabuffer.process.core.aggregators.DistributeAggregator, cn.ideabuffer.process.core.aggregators.Aggregator
    @NotNull
    public R aggregate(@NotNull Context context, List<DistributeMergeableNode<?, R>> list) throws Exception {
        try {
            R newInstance = this.resultClass.newInstance();
            if (list == null || list.isEmpty()) {
                return newInstance;
            }
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(list.size());
            LinkedList linkedList = new LinkedList();
            list.forEach(distributeMergeableNode -> {
                linkedList.add(getFuture(context, distributeMergeableNode, linkedBlockingQueue, newInstance));
            });
            CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) linkedList.toArray(new CompletableFuture[0]));
            try {
                try {
                    if (this.timeout > 0) {
                        allOf.get(this.timeout, TimeUnit.MILLISECONDS);
                    } else {
                        allOf.get();
                    }
                    linkedBlockingQueue.forEach((v0) -> {
                        v0.merge();
                    });
                } catch (TimeoutException e) {
                    this.logger.error("aggregator timeout! timeout:{}", Long.valueOf(this.timeout), e);
                    linkedBlockingQueue.forEach((v0) -> {
                        v0.merge();
                    });
                }
                return newInstance;
            } catch (Throwable th) {
                linkedBlockingQueue.forEach((v0) -> {
                    v0.merge();
                });
                throw th;
            }
        } catch (Exception e2) {
            this.logger.error("aggregate error, resultClass:{}, nodes:{}", new Object[]{this.resultClass, list, e2});
            throw e2;
        }
    }

    private CompletableFuture<?> getFuture(Context context, DistributeMergeableNode<?, R> distributeMergeableNode, BlockingQueue<MergerNode<?, R>> blockingQueue, R r) {
        CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> {
            return AggregateUtils.process(context, distributeMergeableNode);
        }, this.executor).thenAccept(obj -> {
            blockingQueue.offer(new MergerNode(distributeMergeableNode, obj, r));
        });
        return distributeMergeableNode.getTimeout() <= 0 ? thenAccept : AggregateUtils.within(thenAccept, distributeMergeableNode.getTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override // cn.ideabuffer.process.core.aggregators.AbstractAggregator, cn.ideabuffer.process.core.Lifecycle
    public void destroy() {
        if (!(this.executor instanceof ExecutorService) || ((ExecutorService) this.executor).isShutdown()) {
            return;
        }
        ((ExecutorService) this.executor).shutdown();
    }
}
