package com.expediagroup.rhapsody.core.work;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.api.Work;
import com.expediagroup.rhapsody.api.WorkHeader;
import com.expediagroup.rhapsody.api.WorkType;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/expediagroup/rhapsody/core/work/WorkBufferer.class */
public final class WorkBufferer<T> implements Function<Publisher<T>, Flux<List<T>>> {
    private static final Scheduler DEFAULT_SCHEDULER = Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors() * 10, Integer.MAX_VALUE, WorkBufferer.class.getSimpleName());
    private final WorkBufferConfig config;
    private final Function<? super T, Work> workExtractor;
    private final Scheduler sourceScheduler;

    private WorkBufferer(WorkBufferConfig workBufferConfig, Function<? super T, Work> function, Scheduler scheduler) {
        this.config = workBufferConfig;
        this.workExtractor = function;
        this.sourceScheduler = scheduler;
    }

    public static <W extends Work> WorkBufferer<W> identity(WorkBufferConfig workBufferConfig) {
        return identity(workBufferConfig, DEFAULT_SCHEDULER);
    }

    public static <W extends Work> WorkBufferer<W> identity(WorkBufferConfig workBufferConfig, Scheduler scheduler) {
        return new WorkBufferer<>(workBufferConfig, Function.identity(), scheduler);
    }

    public static <W extends Work> WorkBufferer<Acknowledgeable<W>> acknowledgeable(WorkBufferConfig workBufferConfig) {
        return acknowledgeable(workBufferConfig, DEFAULT_SCHEDULER);
    }

    public static <W extends Work> WorkBufferer<Acknowledgeable<W>> acknowledgeable(WorkBufferConfig workBufferConfig, Scheduler scheduler) {
        return new WorkBufferer<>(workBufferConfig, (v0) -> {
            return v0.get();
        }, scheduler);
    }

    @Override // java.util.function.Function
    public Flux<List<T>> apply(Publisher<T> publisher) {
        Scheduler single = Schedulers.single(this.sourceScheduler);
        return Flux.from(publisher).publishOn(single, this.config.getBufferSourcePrefetch()).groupBy(obj -> {
            return extractHeader(obj).subject();
        }).flatMap(groupedFlux -> {
            return bufferGroup(groupedFlux, single);
        }, this.config.getBufferConcurrency()).subscribeOn(single);
    }

    private Mono<List<T>> bufferGroup(GroupedFlux<String, T> groupedFlux, Scheduler scheduler) {
        return groupedFlux.take(this.config.getBufferDuration(), scheduler).take(this.config.getMaxBufferSize()).takeUntil(this::shouldCloseBufferForWork).collectList();
    }

    private boolean shouldCloseBufferForWork(T t) {
        return extractHeader(t).type() == WorkType.COMMIT;
    }

    private WorkHeader extractHeader(T t) {
        return this.workExtractor.apply(t).workHeader();
    }
}
