package com.expediagroup.rhapsody.core.transformer;

import com.expediagroup.rhapsody.api.StreamListener;
import com.expediagroup.rhapsody.api.StreamState;
import java.util.Collection;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/expediagroup/rhapsody/core/transformer/ListeningTransformer.class */
public final class ListeningTransformer<T> implements Function<Publisher<T>, Publisher<T>> {
    private final Collection<StreamListener> listeners;

    public ListeningTransformer(Collection<StreamListener> collection) {
        this.listeners = collection;
    }

    @Override // java.util.function.Function
    public Publisher<T> apply(Publisher<T> publisher) {
        return this.listeners.isEmpty() ? publisher : applyListening(publisher);
    }

    private Flux<T> applyListening(Publisher<T> publisher) {
        return Flux.from(publisher).doOnNext(this::notifyListenersOfNext).doOnSubscribe(subscription -> {
            notifyListenersOfState(StreamState.SUBSCRIBED);
        }).doOnCancel(() -> {
            notifyListenersOfState(StreamState.CANCELED);
        }).doOnError(th -> {
            notifyListenersOfState(StreamState.ERRORED, th);
        }).doOnComplete(() -> {
            notifyListenersOfState(StreamState.COMPLETED);
        });
    }

    private void notifyListenersOfNext(Object obj) {
        this.listeners.forEach(streamListener -> {
            streamListener.next(obj);
        });
    }

    private void notifyListenersOfState(StreamState streamState) {
        notifyListenersOfState(streamState, streamState.name());
    }

    private void notifyListenersOfState(StreamState streamState, Object obj) {
        this.listeners.forEach(streamListener -> {
            streamListener.stateChanged(streamState, obj);
        });
    }
}
