package cz.encircled.jput.reactive;

import cz.encircled.jput.model.ExecutionRun;
import cz.encircled.jput.model.ExecutionRunResultDetails;
import cz.encircled.jput.model.PerfTestExecution;
import cz.encircled.jput.runner.ThreadBasedTestExecutor;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.ranges.RangesKt;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.FluxExtensionsKt;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* compiled from: ReactiveTestExecutor.kt */
@Metadata(mv = {1, 1, 15}, bv = {1, 0, 3}, k = 1, d1 = {"��\u001e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\u0018��2\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\u001e\u0010\u0003\u001a\u00020\u00042\u0006\u0010\u0005\u001a\u00020\u00062\f\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\u00040\bH\u0016¨\u0006\t"}, d2 = {"Lcz/encircled/jput/reactive/ReactiveTestExecutor;", "Lcz/encircled/jput/runner/ThreadBasedTestExecutor;", "()V", "performExecution", "", "execution", "Lcz/encircled/jput/model/PerfTestExecution;", "statement", "Lkotlin/Function0;", "jput-reactive"})
/* loaded from: input_file:cz/encircled/jput/reactive/ReactiveTestExecutor.class */
public final class ReactiveTestExecutor extends ThreadBasedTestExecutor {
    public void performExecution(@NotNull final PerfTestExecution perfTestExecution, @NotNull Function0<Unit> function0) {
        Intrinsics.checkParameterIsNotNull(perfTestExecution, "execution");
        Intrinsics.checkParameterIsNotNull(function0, "statement");
        if (!perfTestExecution.getConf().isReactive()) {
            super.performExecution(perfTestExecution, function0);
            return;
        }
        function0.invoke();
        final Mono mono = (Mono) perfTestExecution.getExecutionParams().get("__executor");
        if (mono == null) {
            throw new IllegalStateException("Reactive function is not set for execution " + perfTestExecution + ", please use JPutReactive#reactiveTestBody");
        }
        for (List list : CollectionsKt.chunked(RangesKt.until(0, perfTestExecution.getConf().getWarmUp()), perfTestExecution.getConf().getParallelCount())) {
            final CountDownLatch countDownLatch = new CountDownLatch(list.size());
            FluxExtensionsKt.toFlux(list).flatMap(new Function<T, Publisher<? extends R>>() { // from class: cz.encircled.jput.reactive.ReactiveTestExecutor$performExecution$$inlined$forEach$lambda$1
                @Override // java.util.function.Function
                @NotNull
                public final Mono<?> apply(Integer num) {
                    return mono;
                }
            }).doOnNext(new Consumer<Object>() { // from class: cz.encircled.jput.reactive.ReactiveTestExecutor$performExecution$1$2
                @Override // java.util.function.Consumer
                public final void accept(Object obj) {
                    countDownLatch.countDown();
                }
            }).subscribe();
            countDownLatch.await();
        }
        final long rampUp = perfTestExecution.getConf().getRampUp() > 0 ? perfTestExecution.getConf().getRampUp() / (perfTestExecution.getConf().getParallelCount() - 1) : 0L;
        final CountDownLatch countDownLatch2 = new CountDownLatch(perfTestExecution.getConf().getParallelCount());
        FluxExtensionsKt.toFlux(RangesKt.until(0, perfTestExecution.getConf().getRepeats())).parallel(perfTestExecution.getConf().getParallelCount()).runOn(Schedulers.parallel()).flatMap(new Function<T, Publisher<? extends R>>() { // from class: cz.encircled.jput.reactive.ReactiveTestExecutor$performExecution$2
            @Override // java.util.function.Function
            public final Mono<Pair<Object, ExecutionRun>> apply(Integer num) {
                final ExecutionRun executionRun = new ExecutionRun(perfTestExecution, 0L, 0L, (ExecutionRunResultDetails) null, 14, (DefaultConstructorMarker) null);
                perfTestExecution.getExecutionResult().put(Long.valueOf(num.intValue()), executionRun);
                StringBuilder append = new StringBuilder().append("Ramp up: ");
                long j = rampUp;
                Intrinsics.checkExpressionValueIsNotNull(num, "index");
                System.out.println((Object) append.append(j * num.intValue()).toString());
                return mono.delayElement(Duration.ofMillis(rampUp * num.intValue())).map(new Function<T, R>() { // from class: cz.encircled.jput.reactive.ReactiveTestExecutor$performExecution$2.1
                    @Override // java.util.function.Function
                    @NotNull
                    public final Pair<Object, ExecutionRun> apply(Object obj) {
                        return new Pair<>(obj, executionRun);
                    }
                }).onErrorContinue(new BiConsumer<Throwable, Object>() { // from class: cz.encircled.jput.reactive.ReactiveTestExecutor$performExecution$2.2
                    @Override // java.util.function.BiConsumer
                    public final void accept(Throwable th, Object obj) {
                        executionRun.measureElapsed();
                        executionRun.setResultDetails(new ExecutionRunResultDetails((Integer) null, th, (String) null, 5, (DefaultConstructorMarker) null));
                    }
                });
            }
        }).doOnNext(new Consumer<Pair<? extends Object, ? extends ExecutionRun>>() { // from class: cz.encircled.jput.reactive.ReactiveTestExecutor$performExecution$3
            @Override // java.util.function.Consumer
            public /* bridge */ /* synthetic */ void accept(Pair<? extends Object, ? extends ExecutionRun> pair) {
                accept2((Pair<? extends Object, ExecutionRun>) pair);
            }

            /* renamed from: accept, reason: avoid collision after fix types in other method */
            public final void accept2(Pair<? extends Object, ExecutionRun> pair) {
                ((ExecutionRun) pair.getSecond()).measureElapsed();
            }
        }).doOnTerminate(new Runnable() { // from class: cz.encircled.jput.reactive.ReactiveTestExecutor$performExecution$4
            @Override // java.lang.Runnable
            public final void run() {
                countDownLatch2.countDown();
            }
        }).subscribe();
        countDownLatch2.await();
    }
}
