package com.expediagroup.rhapsody.core.transformer;

import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/expediagroup/rhapsody/core/transformer/AutoAcknowledgingTransformer.class */
public final class AutoAcknowledgingTransformer<T, U> implements Function<Publisher<T>, Flux<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AutoAcknowledgingTransformer.class);
    private static final Scheduler SCHEDULER = Schedulers.newParallel(AutoAcknowledgingTransformer.class.getSimpleName(), Runtime.getRuntime().availableProcessors(), true);
    private final AutoAcknowledgementConfig config;
    private final Function<? super Flux<T>, ? extends Publisher<U>> reducer;
    private final Consumer<? super U> acknowledger;

    public AutoAcknowledgingTransformer(AutoAcknowledgementConfig autoAcknowledgementConfig, Function<? super Flux<T>, ? extends Publisher<U>> function, Consumer<? super U> consumer) {
        this.config = autoAcknowledgementConfig;
        this.reducer = function;
        this.acknowledger = consumer;
    }

    @Override // java.util.function.Function
    public Flux<T> apply(Publisher<T> publisher) {
        FluxSink<T> createAcknowledgingSink = createAcknowledgingSink();
        Flux concatMap = Flux.from(publisher).concatMap(obj -> {
            return Mono.just(obj).doAfterTerminate(() -> {
                createAcknowledgingSink.next(obj);
            });
        }, this.config.getPrefetch());
        Objects.requireNonNull(createAcknowledgingSink);
        Flux doOnCancel = concatMap.doOnCancel(createAcknowledgingSink::complete);
        Objects.requireNonNull(createAcknowledgingSink);
        return doOnCancel.doAfterTerminate(createAcknowledgingSink::complete);
    }

    private FluxSink<T> createAcknowledgingSink() {
        DirectProcessor create = DirectProcessor.create();
        create.window(this.config.getInterval(), SCHEDULER).doOnError(th -> {
            LOGGER.warn("Failed to window Acknowledgements. Resubscribing...", th);
        }).retry().concatMap(this.reducer).flatMapSequential(obj -> {
            return applyDelay(create, obj);
        }, calculateMaxConcurrentAcknowledgementWindows().intValue()).doOnNext(this.acknowledger).doOnError(th2 -> {
            LOGGER.warn("Failed to run acknowledger. Resubscribing...", th2);
        }).retry().subscribe();
        return create.sink();
    }

    private Mono<U> applyDelay(Flux<?> flux, U u) {
        return Mono.just(u).delayUntil(obj -> {
            return Mono.first(new Mono[]{flux.ignoreElements(), Mono.delay(this.config.getDelay(), SCHEDULER)});
        });
    }

    private Long calculateMaxConcurrentAcknowledgementWindows() {
        return Long.valueOf((this.config.getDelay().toMillis() / this.config.getInterval().toMillis()) + 2);
    }
}
