package com.expediagroup.rhapsody.core.work;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.api.RxFailureConsumer;
import com.expediagroup.rhapsody.api.RxWorkPreparer;
import com.expediagroup.rhapsody.api.Work;
import com.expediagroup.rhapsody.api.WorkReducer;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/expediagroup/rhapsody/core/work/AcknowledgeableWorkBufferPreparer.class */
public class AcknowledgeableWorkBufferPreparer<W extends Work> implements Function<List<Acknowledgeable<W>>, Mono<Acknowledgeable<W>>> {
    private final WorkReducer<W> workReducer;
    private final RxWorkPreparer<W> workPreparer;
    private final RxFailureConsumer<W> failureConsumer;

    public AcknowledgeableWorkBufferPreparer(WorkReducer<W> workReducer, RxWorkPreparer<W> rxWorkPreparer, RxFailureConsumer<W> rxFailureConsumer) {
        this.workReducer = workReducer;
        this.workPreparer = rxWorkPreparer;
        this.failureConsumer = rxFailureConsumer;
    }

    @Override // java.util.function.Function
    public Mono<Acknowledgeable<W>> apply(List<Acknowledgeable<W>> list) {
        List collectNonCanceledAcknowledgeable = WorkBuffers.collectNonCanceledAcknowledgeable(list, (v0) -> {
            v0.acknowledge();
        });
        Flux fromIterable = Flux.fromIterable(collectNonCanceledAcknowledgeable);
        WorkReducer<W> workReducer = this.workReducer;
        Objects.requireNonNull(workReducer);
        return fromIterable.reduce(Acknowledgeable.reducing(workReducer::reduceTry)).flatMap(this::prepareIfNecessary).onErrorResume(th -> {
            return handleNonCanceledPreparationError(collectNonCanceledAcknowledgeable, th);
        });
    }

    private Mono<Acknowledgeable<W>> prepareIfNecessary(Acknowledgeable<W> acknowledgeable) {
        return ((Work) acknowledgeable.get()).isPrepared() ? Mono.just(acknowledgeable) : Mono.from(this.workPreparer.prepare((Work) acknowledgeable.get())).map(work -> {
            return acknowledgeable.propagate(work, acknowledgeable.getAcknowledger(), acknowledgeable.getNacknowledger());
        });
    }

    private Mono<? extends Acknowledgeable<W>> handleNonCanceledPreparationError(List<Acknowledgeable<W>> list, Throwable th) {
        Flux fromIterable = Flux.fromIterable(list);
        WorkReducer<W> workReducer = this.workReducer;
        Objects.requireNonNull(workReducer);
        return fromIterable.reduce(Acknowledgeable.reducing(workReducer::reduceFail)).flatMapMany(acknowledgeable -> {
            return consumeFailure(acknowledgeable, th);
        }).then(Mono.empty());
    }

    private Flux<?> consumeFailure(Acknowledgeable<W> acknowledgeable, Throwable th) {
        return Flux.from((Publisher) this.failureConsumer.apply((Work) acknowledgeable.get(), th)).doOnCancel(acknowledgeable.getAcknowledger()).doOnComplete(acknowledgeable.getAcknowledger()).doOnError(acknowledgeable.getNacknowledger()).onErrorResume(th2 -> {
            return Mono.empty();
        });
    }
}
