package com.expediagroup.rhapsody.core.stanza;

import com.expediagroup.rhapsody.api.SubscriberFactory;
import com.expediagroup.rhapsody.core.stanza.StanzaConfig;
import com.expediagroup.rhapsody.core.transformer.ListeningTransformer;
import com.expediagroup.rhapsody.core.transformer.MetricsConfig;
import com.expediagroup.rhapsody.core.transformer.MetricsTransformer;
import com.expediagroup.rhapsody.core.transformer.SchedulingTransformer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/expediagroup/rhapsody/core/stanza/GroupedStanza.class */
public abstract class GroupedStanza<C extends StanzaConfig, U, V, R> extends Stanza<C> {
    @Override // com.expediagroup.rhapsody.core.stanza.Stanza
    protected final Disposable startDisposable(C c) {
        MeterRegistry orElse = c.meterRegistry().orElse(Metrics.globalRegistry);
        MetricsConfig metrics = c.metrics();
        MetricsTransformer metricsTransformer = new MetricsTransformer(metrics.withTags(Tags.of("flow", "inbound")), orElse);
        MetricsTransformer metricsTransformer2 = new MetricsTransformer(metrics.withTags(Tags.of("flow", "outbound")), orElse);
        Function<? super Publisher<U>, ? extends Publisher<V>> buildPrescheduler = buildPrescheduler(c);
        SchedulingTransformer schedulingTransformer = new SchedulingTransformer(c.scheduling(), c.name());
        Function<? super Publisher<V>, ? extends Publisher<R>> buildTransformer = buildTransformer(c);
        SubscriberFactory<R> buildSubscriberFactory = buildSubscriberFactory(c);
        return Flux.from(buildGroupPublisher(c)).transform(new ListeningTransformer(c.streamListeners())).subscribe(publisher -> {
            Flux.from(publisher).transform(metricsTransformer).transform(buildPrescheduler).transform(schedulingTransformer).transform(buildTransformer).transform(metricsTransformer2).subscribe(buildSubscriberFactory.create());
        });
    }

    protected abstract Publisher<? extends Publisher<U>> buildGroupPublisher(C c);

    protected abstract Function<? super Publisher<U>, ? extends Publisher<V>> buildPrescheduler(C c);

    protected abstract Function<? super Publisher<V>, ? extends Publisher<R>> buildTransformer(C c);

    protected abstract SubscriberFactory<R> buildSubscriberFactory(C c);
}
