package com.aol.simple.react.stream.traits;

import com.aol.simple.react.blockers.Blocker;
import com.aol.simple.react.collectors.ReactCollector;
import com.aol.simple.react.exceptions.ExceptionSoftener;
import com.aol.simple.react.exceptions.FilteredExecutionPathException;
import com.aol.simple.react.exceptions.ThrowsSoftened;
import com.aol.simple.react.extractors.Extractor;
import com.aol.simple.react.extractors.Extractors;
import com.aol.simple.react.stream.MissingValue;
import com.aol.simple.react.stream.Status;
import com.aol.simple.react.stream.StreamWrapper;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;

/* loaded from: input_file:com/aol/simple/react/stream/traits/BlockingStream.class */
public interface BlockingStream<U> extends ConfigurableStream<U> {
    public static final ExceptionSoftener exceptionSoftener = ExceptionSoftener.singleton.factory.getInstance();

    default ReactCollector<U> collectResults() {
        return new ReactCollector<>(this);
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default List<U> block() {
        return (List) block(Collectors.toList(), getLastActive());
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default <R> R block(Collector collector) {
        return (R) block(collector, getLastActive());
    }

    default <R> R block(Collector collector, StreamWrapper streamWrapper) {
        return (R) streamWrapper.stream().map(completableFuture -> {
            return getSafe(completableFuture, getErrorHandler());
        }).filter(obj -> {
            return obj != MissingValue.MISSING_VALUE;
        }).collect(collector);
    }

    static <R> R aggregateResults(Collector collector, List<CompletableFuture> list, Optional<Consumer<Throwable>> optional) {
        return (R) list.stream().map(completableFuture -> {
            return getSafe(completableFuture, optional);
        }).filter(obj -> {
            return obj != MissingValue.MISSING_VALUE;
        }).collect(collector);
    }

    static void capture(Exception exc, Optional<Consumer<Throwable>> optional) {
        optional.ifPresent(consumer -> {
            if (exc.getCause() instanceof FilteredExecutionPathException) {
                return;
            }
            consumer.accept(exc.getCause());
        });
    }

    static Object getSafe(CompletableFuture completableFuture, Optional<Consumer<Throwable>> optional) {
        try {
            return completableFuture.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            capture(e, optional);
            exceptionSoftener.throwSoftenedException(e);
            return MissingValue.MISSING_VALUE;
        } catch (RuntimeException e2) {
            capture(e2, optional);
            return MissingValue.MISSING_VALUE;
        } catch (Exception e3) {
            capture(e3, optional);
            return MissingValue.MISSING_VALUE;
        }
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default U first() {
        return (U) blockAndExtract(Extractors.first(), status -> {
            return status.getCompleted() > 0;
        });
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default U last() {
        return (U) blockAndExtract(Extractors.last());
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default <R> R blockAndExtract(Extractor extractor) {
        return (R) blockAndExtract(extractor, status -> {
            return false;
        });
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default <R> R blockAndExtract(Extractor extractor, Predicate<Status> predicate) {
        return (R) extractor.extract(block());
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default List<U> block(Predicate<Status> predicate) {
        return new Blocker(getLastActive().list(), getErrorHandler()).block(predicate);
    }

    @ThrowsSoftened({InterruptedException.class, ExecutionException.class})
    default <R> R block(Collector collector, Predicate<Status> predicate) {
        return (R) block(predicate).stream().collect(collector);
    }

    default <R> R submitAndBlock(Function<List<U>, R> function) {
        return (R) collectResults().block().submit(list -> {
            return function.apply(list);
        });
    }
}
