package code.ponfee.commons.concurrent;

import code.ponfee.commons.exception.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/* loaded from: input_file:code/ponfee/commons/concurrent/StreamForker.class */
public class StreamForker<T> {
    private final Stream<T> stream;
    private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:code/ponfee/commons/concurrent/StreamForker$BlockingQueueSpliterator.class */
    public static class BlockingQueueSpliterator<T> implements Spliterator<T> {
        private final BlockingQueue<T> q;

        BlockingQueueSpliterator(BlockingQueue<T> blockingQueue) {
            this.q = blockingQueue;
        }

        @Override // java.util.Spliterator
        public boolean tryAdvance(Consumer<? super T> consumer) {
            try {
                Object take = this.q.take();
                if (take == ForkingStreamConsumer.END_OF_STREAM) {
                    return false;
                }
                consumer.accept(take);
                return true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ServerException(e);
            }
        }

        @Override // java.util.Spliterator
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override // java.util.Spliterator
        public long estimateSize() {
            return 0L;
        }

        @Override // java.util.Spliterator
        public int characteristics() {
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:code/ponfee/commons/concurrent/StreamForker$ForkingStreamConsumer.class */
    public static class ForkingStreamConsumer<T> implements Consumer<T>, Results {
        private static final Object END_OF_STREAM = new Object();
        private final List<BlockingQueue<T>> queues;
        private final Map<Object, Future<?>> actions;

        ForkingStreamConsumer(List<BlockingQueue<T>> list, Map<Object, Future<?>> map) {
            this.queues = list;
            this.actions = map;
        }

        @Override // java.util.function.Consumer
        public void accept(T t) {
            this.queues.forEach(blockingQueue -> {
                blockingQueue.add(t);
            });
        }

        @Override // code.ponfee.commons.concurrent.StreamForker.Results
        public <R> R get(Object obj) {
            try {
                return (R) this.actions.get(obj).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        void finish() {
            accept(END_OF_STREAM);
        }
    }

    /* loaded from: input_file:code/ponfee/commons/concurrent/StreamForker$Results.class */
    public interface Results {
        <R> R get(Object obj);
    }

    public StreamForker(Stream<T> stream) {
        this.stream = stream;
    }

    public StreamForker<T> fork(Object obj, Function<Stream<T>, ?> function) {
        this.forks.put(obj, function);
        return this;
    }

    public Results getResults() {
        ForkingStreamConsumer<T> build = build();
        try {
            ((Stream) this.stream.sequential()).forEach(build);
            return build;
        } finally {
            build.finish();
        }
    }

    private ForkingStreamConsumer<T> build() {
        ArrayList arrayList = new ArrayList();
        return new ForkingStreamConsumer<>(arrayList, (Map) this.forks.entrySet().stream().reduce(new HashMap(), (hashMap, entry) -> {
            hashMap.put(entry.getKey(), getOperationResult(arrayList, (Function) entry.getValue()));
            return hashMap;
        }, (hashMap2, hashMap3) -> {
            hashMap2.putAll(hashMap3);
            return hashMap2;
        }));
    }

    private Future<?> getOperationResult(List<BlockingQueue<T>> list, Function<Stream<T>, ?> function) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        list.add(linkedBlockingQueue);
        Stream stream = StreamSupport.stream(new BlockingQueueSpliterator(linkedBlockingQueue), false);
        return CompletableFuture.supplyAsync(() -> {
            return function.apply(stream);
        });
    }
}
