package com.expediagroup.rhapsody.core.adapter;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.api.SubscriberFactory;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/expediagroup/rhapsody/core/adapter/Adapters.class */
public final class Adapters {
    private Adapters() {
    }

    public static <T> Acknowledgeable<T> toLoggingAcknowledgeable(T t) {
        return new LoggingAcknowledgeable(t);
    }

    public static <T> Subscriber<Acknowledgeable<T>> toAcknowledgeableSubscriber(Subscriber<T> subscriber) {
        return toSendingSubscriber(flux -> {
            flux.concatMap(Adapters::toTerminalAcknowledgement).subscribe(subscriber);
        });
    }

    public static <T> Subscriber<T> toSendingSubscriber(Consumer<? super Flux<T>> consumer) {
        return toSendingProcessor(consumer);
    }

    public static <T> Subscriber<T> toSubscriber(Consumer<T> consumer) {
        return new ConsumingSubscriber(consumer);
    }

    public static <T> Consumer<T> toConsumer(SubscriberFactory<T> subscriberFactory) {
        return obj -> {
            Mono.just(obj).subscribe(subscriberFactory.create());
        };
    }

    public static <T> Consumer<T> toSendingConsumerPerThread(Consumer<? super Flux<T>> consumer) {
        ThreadLocal withInitial = ThreadLocal.withInitial(() -> {
            return toSendingConsumer(consumer);
        });
        return obj -> {
            ((Consumer) withInitial.get()).accept(obj);
        };
    }

    public static <T> Consumer<T> toSendingConsumer(Consumer<? super Flux<T>> consumer) {
        FluxSink sink = toSendingProcessor(consumer).sink();
        Objects.requireNonNull(sink);
        return sink::next;
    }

    public static <T, R> Consumer<T> toSynchronousConsumer(Function<? super Flux<T>, ? extends Publisher<R>> function) {
        return obj -> {
            Flux.just(obj).publish(function).ignoreElements().subscribeWith(new BlockableTerminationSubscriber()).block();
        };
    }

    public static <T, R> Consumer<T> toSynchronousConsumer(Function<? super Flux<T>, ? extends Publisher<R>> function, Duration duration) {
        return obj -> {
            Flux.just(obj).publish(function).ignoreElements().subscribeWith(new BlockableTerminationSubscriber()).block(duration);
        };
    }

    private static <T> FluxProcessor<T, T> toSendingProcessor(Consumer<? super Flux<T>> consumer) {
        EmitterProcessor create = EmitterProcessor.create(1);
        consumer.accept(create);
        return create;
    }

    private static <T> Mono<T> toTerminalAcknowledgement(Acknowledgeable<T> acknowledgeable) {
        return Mono.just(acknowledgeable).doAfterTerminate(acknowledgeable.getAcknowledger()).map((v0) -> {
            return v0.get();
        });
    }
}
