package com.microsoft.azure.spring.integration.servicebus.topic.support;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
import com.microsoft.azure.spring.integration.servicebus.topic.ServiceBusTopicTemplate;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.springframework.messaging.Message;

/* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/topic/support/ServiceBusTopicTestOperation.class */
public class ServiceBusTopicTestOperation extends ServiceBusTopicTemplate {
    private final Multimap<String, IMessage> topicsByName;
    private final Map<String, Map<String, ServiceBusMessageHandler<?>>> handlersByNameAndGroup;

    public ServiceBusTopicTestOperation(ServiceBusTopicClientFactory serviceBusTopicClientFactory) {
        super(serviceBusTopicClientFactory);
        this.topicsByName = ArrayListMultimap.create();
        this.handlersByNameAndGroup = new ConcurrentHashMap();
    }

    @Override // com.microsoft.azure.spring.integration.servicebus.ServiceBusTemplate
    public <U> CompletableFuture<Void> sendAsync(String str, Message<U> message, PartitionSupplier partitionSupplier) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        IMessage iMessage = (IMessage) getMessageConverter().fromMessage(message, IMessage.class);
        this.topicsByName.put(str, iMessage);
        this.handlersByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
        this.handlersByNameAndGroup.get(str).values().forEach(serviceBusMessageHandler -> {
            serviceBusMessageHandler.onMessageAsync(iMessage);
        });
        completableFuture.complete(null);
        return completableFuture;
    }

    @Override // com.microsoft.azure.spring.integration.servicebus.topic.ServiceBusTopicTemplate
    protected void internalSubscribe(String str, String str2, Consumer<Message<?>> consumer, Class<?> cls) {
        ISubscriptionClient orCreateSubscriptionClient = ((ServiceBusTopicClientFactory) this.senderFactory).getOrCreateSubscriptionClient(str, str2);
        ServiceBusTopicTemplate.TopicMessageHandler topicMessageHandler = new ServiceBusTopicTemplate.TopicMessageHandler(this, consumer, cls, orCreateSubscriptionClient);
        try {
            orCreateSubscriptionClient.registerMessageHandler(topicMessageHandler);
            this.handlersByNameAndGroup.putIfAbsent(str, new ConcurrentHashMap());
            this.handlersByNameAndGroup.get(str).put(str2, topicMessageHandler);
        } catch (ServiceBusException | InterruptedException e) {
            throw new ServiceBusRuntimeException("Failed to internalSubscribe message handler", e);
        }
    }

    @Override // com.microsoft.azure.spring.integration.servicebus.topic.ServiceBusTopicTemplate
    public boolean unsubscribe(String str, String str2) {
        this.handlersByNameAndGroup.get(str).remove(str2);
        return true;
    }
}
