package com.expediagroup.rhapsody.core.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.api.PublisherFactory;
import com.expediagroup.rhapsody.api.SubscriberFactory;
import com.expediagroup.rhapsody.core.adapter.Adapters;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.WorkQueueProcessor;

/* loaded from: input_file:com/expediagroup/rhapsody/core/factory/InMemoryFactory.class */
public final class InMemoryFactory {
    private static final Map<String, WorkQueueProcessor> PROCESSORS_BY_NAME = new ConcurrentHashMap();

    private InMemoryFactory() {
    }

    public static <T> SubscriberFactory<T> subscriberFactory(String str) {
        return () -> {
            return subscriber(str);
        };
    }

    public static <T> Subscriber<T> subscriber(String str) {
        WorkQueueProcessor namedProcessor = getNamedProcessor(str);
        Objects.requireNonNull(namedProcessor);
        return Adapters.toSubscriber(obj -> {
            namedProcessor.onNext(obj);
        });
    }

    public static <T> PublisherFactory<T> publisherFactory(String str) {
        return () -> {
            return publisher(str);
        };
    }

    public static <T> Flux<Acknowledgeable<T>> acknowledegablePublisher(String str) {
        return publisher(str).map(Adapters::toLoggingAcknowledgeable);
    }

    public static <T> Flux<T> publisher(String str) {
        return getNamedProcessor(str);
    }

    private static <T> WorkQueueProcessor<T> getNamedProcessor(String str) {
        return PROCESSORS_BY_NAME.computeIfAbsent(str, InMemoryFactory::createNamedProcessor);
    }

    private static WorkQueueProcessor createNamedProcessor(String str) {
        return WorkQueueProcessor.builder().name(str).share(true).build();
    }
}
