package de.otto.synapse.subscription;

import com.google.common.collect.Maps;
import de.otto.synapse.channel.selector.Selector;
import de.otto.synapse.endpoint.BestMatchingSelectableComparator;
import de.otto.synapse.endpoint.MessageInterceptorRegistration;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.sender.MessageSenderEndpoint;
import de.otto.synapse.endpoint.sender.MessageSenderEndpointFactory;
import de.otto.synapse.subscription.events.SubscriptionCreated;
import de.otto.synapse.subscription.events.SubscriptionUpdated;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;

@ConditionalOnBean({MessageSenderEndpoint.class, SnapshotProvider.class})
@Service
/* loaded from: input_file:de/otto/synapse/subscription/SubscriptionService.class */
public class SubscriptionService {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionService.class);
    private final MessageInterceptorRegistry registry;
    private final List<MessageSenderEndpointFactory> senderEndpointFactories;
    private final Map<String, SnapshotProvider> snapshotProviders;
    private final ConcurrentMap<String, MessageSenderEndpoint> targetSenders = new ConcurrentHashMap();
    private final Subscriptions subscriptions = new Subscriptions();

    public SubscriptionService(MessageInterceptorRegistry messageInterceptorRegistry, List<MessageSenderEndpointFactory> list, List<SnapshotProvider> list2) {
        LOG.info("Initializing SubscriptionService for " + list2.stream().map((v0) -> {
            return v0.channelName();
        }).collect(Collectors.toList()));
        this.registry = messageInterceptorRegistry;
        this.senderEndpointFactories = list;
        this.snapshotProviders = Maps.uniqueIndex(list2, (v0) -> {
            return v0.channelName();
        });
    }

    public void onSubscriptionCreated(SubscriptionCreated subscriptionCreated, Class<? extends Selector> cls) {
        try {
            if (this.snapshotProviders.get(subscriptionCreated.getSubscribedChannel()) == null) {
                throw new IllegalArgumentException("No SnapshopProvider configured for channel " + subscriptionCreated.getSubscribedChannel());
            }
            Subscription subscription = new Subscription(subscriptionCreated.getId(), subscriptionCreated.getSubscribedChannel(), subscriptionCreated.getResponseChannel());
            this.subscriptions.addIfMissing(subscription);
            this.targetSenders.computeIfAbsent(subscriptionCreated.getResponseChannel(), str -> {
                return this.senderEndpointFactories.stream().filter(messageSenderEndpointFactory -> {
                    return messageSenderEndpointFactory.matches(cls);
                }).min(new BestMatchingSelectableComparator(cls)).orElseThrow(() -> {
                    return new IllegalArgumentException("Unable to subscribe to " + subscriptionCreated.getSubscribedChannel() + " because no matching sender factory was found.");
                }).create(subscriptionCreated.getResponseChannel());
            });
            this.registry.register(MessageInterceptorRegistration.matchingSenderChannelsWith(subscription.getChannelName(), new SubscriptionInterceptor(subscription, this.targetSenders.get(subscriptionCreated.getResponseChannel()))));
        } catch (IllegalArgumentException e) {
            LOG.error("unable to add a subscription to channel " + subscriptionCreated.getSubscribedChannel() + ": " + e.getMessage());
            throw e;
        }
    }

    public void onSubscriptionUpdated(SubscriptionUpdated subscriptionUpdated) {
        Subscription orElseThrow = this.subscriptions.get(subscriptionUpdated.getId()).orElseThrow(() -> {
            return new IllegalArgumentException("Subscription " + subscriptionUpdated.getId() + " does not exist");
        });
        orElseThrow.subscribe(subscriptionUpdated.getSubscribedEntities());
        orElseThrow.unsubscribe(subscriptionUpdated.getUnsubscribedEntities());
        sendSnapshot(orElseThrow.getSubscribedEntities(), orElseThrow.getTargetChannelName(), this.snapshotProviders.get(orElseThrow.getChannelName()));
    }

    public void onSubscriptionDeleted(String str) {
        this.subscriptions.remove(str);
    }

    public Subscriptions getSubscriptions() {
        return this.subscriptions;
    }

    private void sendSnapshot(Set<String> set, String str, SnapshotProvider snapshotProvider) {
        MessageSenderEndpoint messageSenderEndpoint = this.targetSenders.get(str);
        set.forEach(str2 -> {
            messageSenderEndpoint.sendBatch(snapshotProvider.snapshot(str2)).join();
        });
    }
}
