package com.expediagroup.rhapsody.core.transformer;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.api.Deduplication;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/expediagroup/rhapsody/core/transformer/DeduplicatingTransformer.class */
public final class DeduplicatingTransformer<T> implements Function<Publisher<T>, Publisher<T>> {
    private static final Scheduler DEFAULT_SCHEDULER = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors() * 10, Integer.MAX_VALUE, DeduplicatingTransformer.class.getSimpleName());
    private final DeduplicationConfig config;
    private final Deduplication<T> deduplication;
    private final Scheduler sourceScheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/expediagroup/rhapsody/core/transformer/DeduplicatingTransformer$AcknowledgeableDeduplication.class */
    public static final class AcknowledgeableDeduplication<T> implements Deduplication<Acknowledgeable<T>> {
        private final Deduplication<T> deduplication;

        public AcknowledgeableDeduplication(Deduplication<T> deduplication) {
            this.deduplication = deduplication;
        }

        public Object extractKey(Acknowledgeable<T> acknowledgeable) {
            return this.deduplication.extractKey(acknowledgeable.get());
        }

        public Acknowledgeable<T> reduceDuplicates(Acknowledgeable<T> acknowledgeable, Acknowledgeable<T> acknowledgeable2) {
            Deduplication<T> deduplication = this.deduplication;
            Objects.requireNonNull(deduplication);
            return acknowledgeable.reduce(deduplication::reduceDuplicates, acknowledgeable2);
        }
    }

    private DeduplicatingTransformer(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        this.config = deduplicationConfig;
        this.deduplication = deduplication;
        this.sourceScheduler = scheduler;
    }

    public static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication) {
        return identity(deduplicationConfig, deduplication, DEFAULT_SCHEDULER);
    }

    public static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        return new DeduplicatingTransformer<>(deduplicationConfig, deduplication, scheduler);
    }

    public static <T> DeduplicatingTransformer<Acknowledgeable<T>> acknowledgeable(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication) {
        return acknowledgeable(deduplicationConfig, deduplication, DEFAULT_SCHEDULER);
    }

    public static <T> DeduplicatingTransformer<Acknowledgeable<T>> acknowledgeable(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        return new DeduplicatingTransformer<>(deduplicationConfig, new AcknowledgeableDeduplication(deduplication), scheduler);
    }

    @Override // java.util.function.Function
    public Publisher<T> apply(Publisher<T> publisher) {
        return this.config.isEnabled() ? applyDeduplication(publisher) : publisher;
    }

    private Flux<T> applyDeduplication(Publisher<T> publisher) {
        Scheduler single = Schedulers.single(this.sourceScheduler);
        Flux publishOn = Flux.from(publisher).publishOn(single, this.config.getDeduplicationSourcePrefetch());
        Deduplication<T> deduplication = this.deduplication;
        Objects.requireNonNull(deduplication);
        return publishOn.groupBy(deduplication::extractKey).flatMap(groupedFlux -> {
            return deduplicateGroup(groupedFlux, single);
        }, this.config.getDeduplicationConcurrency()).subscribeOn(single);
    }

    private Mono<T> deduplicateGroup(GroupedFlux<Object, T> groupedFlux, Scheduler scheduler) {
        Flux take = groupedFlux.take(this.config.getDeduplicationDuration(), scheduler).take(this.config.getMaxDeduplicationSize());
        Deduplication<T> deduplication = this.deduplication;
        Objects.requireNonNull(deduplication);
        return take.reduce(deduplication::reduceDuplicates);
    }
}
