package com.expediagroup.rhapsody.core.transformer;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/expediagroup/rhapsody/core/transformer/MetricsTransformer.class */
public final class MetricsTransformer<T> implements Function<Publisher<T>, Publisher<T>> {
    private final AtomicInteger subscribers = new AtomicInteger();
    private final Counter items;
    private final Counter errors;

    public MetricsTransformer(MetricsConfig metricsConfig, MeterRegistry meterRegistry) {
        Gauge.builder(metricsConfig.getName() + ".subscribers", this.subscribers, (v0) -> {
            return v0.doubleValue();
        }).tags(metricsConfig.getTags()).baseUnit("subscribers").description("Number of current Subscribers").register(meterRegistry);
        this.items = Counter.builder(metricsConfig.getName() + ".items").tags(metricsConfig.getTags()).baseUnit("items").description("Number of Items emitted").register(meterRegistry);
        this.errors = Counter.builder(metricsConfig.getName() + ".errors").tags(metricsConfig.getTags()).baseUnit("errors").description("Number of Errors emitted").register(meterRegistry);
    }

    @Override // java.util.function.Function
    public Publisher<T> apply(Publisher<T> publisher) {
        return Flux.from(publisher).doOnSubscribe(subscription -> {
            this.subscribers.incrementAndGet();
        }).doOnNext(obj -> {
            this.items.increment();
        }).doOnError(th -> {
            this.errors.increment();
        }).doFinally(signalType -> {
            this.subscribers.decrementAndGet();
        });
    }
}
