package net.corda.nodeapi.internal.bridging;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import net.corda.core.concurrent.CordaFuture;
import net.corda.core.identity.CordaX500Name;
import net.corda.core.utilities.KotlinUtilsKt;
import net.corda.core.utilities.NetworkHostAndPort;
import net.corda.nodeapi.internal.ArtemisMessagingClient;
import net.corda.nodeapi.internal.ArtemisMessagingComponent;
import net.corda.nodeapi.internal.ArtemisSessionProvider;
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager;
import net.corda.nodeapi.internal.config.CertificateStore;
import net.corda.nodeapi.internal.config.CertificateStoreSupplier;
import net.corda.nodeapi.internal.config.MutualSslConfiguration;
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus;
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage;
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient;
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration;
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.MDC;
import rx.Subscription;
import rx.functions.Action1;

/* compiled from: AMQPBridgeManager.kt */
@Metadata(mv = {1, 1, 11}, bv = {1, AMQPBridgeManager.NUM_BRIDGE_THREADS, AMQPClient.NUM_CLIENT_THREADS}, k = 1, d1 = {"��v\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n��\n\u0002\u0010\u000b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010%\n\u0002\u0010\u000e\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0003\n\u0002\u0010 \n��\n\u0002\u0010\"\n\u0002\u0018\u0002\n\u0002\b\u0007\u0018�� +2\u00020\u0001:\u0003)*+B'\b\u0016\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t¢\u0006\u0002\u0010\nB7\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t\u0012\f\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\r0\f\u0012\n\b\u0002\u0010\u000e\u001a\u0004\u0018\u00010\u000f¢\u0006\u0002\u0010\u0010J\b\u0010\u001d\u001a\u00020\u001eH\u0016J,\u0010\u001f\u001a\u00020\u001e2\u0006\u0010 \u001a\u00020\u00182\f\u0010!\u001a\b\u0012\u0004\u0012\u00020\u00050\"2\f\u0010#\u001a\b\u0012\u0004\u0012\u00020%0$H\u0016J\u001e\u0010&\u001a\u00020\u001e2\u0006\u0010 \u001a\u00020\u00182\f\u0010!\u001a\b\u0012\u0004\u0012\u00020\u00050\"H\u0016J\b\u0010'\u001a\u00020\u001eH\u0016J\b\u0010(\u001a\u00020\u001eH\u0016R\u000e\u0010\u0011\u001a\u00020\u0012X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0013\u001a\u0004\u0018\u00010\rX\u0082\u000e¢\u0006\u0002\n��R\u0014\u0010\u000b\u001a\b\u0012\u0004\u0012\u00020\r0\fX\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u000e\u001a\u0004\u0018\u00010\u000fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0014\u001a\u00020\u0015X\u0082\u0004¢\u0006\u0002\n��R \u0010\u0016\u001a\u0014\u0012\u0004\u0012\u00020\u0018\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u001a0\u00190\u0017X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u001b\u001a\u0004\u0018\u00010\u001cX\u0082\u000e¢\u0006\u0002\n��¨\u0006,"}, d2 = {"Lnet/corda/nodeapi/internal/bridging/AMQPBridgeManager;", "Lnet/corda/nodeapi/internal/bridging/BridgeManager;", "config", "Lnet/corda/nodeapi/internal/config/MutualSslConfiguration;", "p2pAddress", "Lnet/corda/core/utilities/NetworkHostAndPort;", "maxMessageSize", "", "crlCheckSoftFail", "", "(Lnet/corda/nodeapi/internal/config/MutualSslConfiguration;Lnet/corda/core/utilities/NetworkHostAndPort;IZ)V", "artemisMessageClientFactory", "Lkotlin/Function0;", "Lnet/corda/nodeapi/internal/ArtemisSessionProvider;", "bridgeMetricsService", "Lnet/corda/nodeapi/internal/bridging/BridgeMetricsService;", "(Lnet/corda/nodeapi/internal/config/MutualSslConfiguration;IZLkotlin/jvm/functions/Function0;Lnet/corda/nodeapi/internal/bridging/BridgeMetricsService;)V", "amqpConfig", "Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration;", "artemis", "lock", "Ljava/util/concurrent/locks/ReentrantLock;", "queueNamesToBridgesMap", "", "", "", "Lnet/corda/nodeapi/internal/bridging/AMQPBridgeManager$AMQPBridge;", "sharedEventLoopGroup", "Lio/netty/channel/EventLoopGroup;", "close", "", "deployBridge", "queueName", "targets", "", "legalNames", "", "Lnet/corda/core/identity/CordaX500Name;", "destroyBridge", "start", "stop", "AMQPBridge", "AMQPConfigurationImpl", "Companion", "node-api"})
/* loaded from: input_file:net/corda/nodeapi/internal/bridging/AMQPBridgeManager.class */
public final class AMQPBridgeManager implements BridgeManager {
    private final ReentrantLock lock;
    private final Map<String, List<AMQPBridge>> queueNamesToBridgesMap;
    private final AMQPConfiguration amqpConfig;
    private EventLoopGroup sharedEventLoopGroup;
    private ArtemisSessionProvider artemis;
    private final Function0<ArtemisSessionProvider> artemisMessageClientFactory;
    private final BridgeMetricsService bridgeMetricsService;
    private static final int NUM_BRIDGE_THREADS = 0;
    public static final Companion Companion = new Companion(null);

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: AMQPBridgeManager.kt */
    @Metadata(mv = {1, 1, 11}, bv = {1, AMQPBridgeManager.NUM_BRIDGE_THREADS, AMQPClient.NUM_CLIENT_THREADS}, k = 1, d1 = {"��\u0082\u0001\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010\u000e\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n��\n\u0002\u0010\"\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\u000b\n\u0002\b\u0006\b\u0002\u0018�� 52\u00020\u0001:\u00015BK\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\f\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005\u0012\f\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\t0\b\u0012\u0006\u0010\n\u001a\u00020\u000b\u0012\u0006\u0010\f\u001a\u00020\r\u0012\u0006\u0010\u000e\u001a\u00020\u000f\u0012\b\u0010\u0010\u001a\u0004\u0018\u00010\u0011¢\u0006\u0002\u0010\u0012J\u0010\u0010%\u001a\u00020&2\u0006\u0010'\u001a\u00020(H\u0002J\u0016\u0010)\u001a\u00020&2\f\u0010*\u001a\b\u0012\u0004\u0012\u00020\u00030+H\u0002J\u0010\u0010,\u001a\u00020&2\u0006\u0010*\u001a\u00020\u0003H\u0002J\u0010\u0010-\u001a\u00020&2\u0006\u0010*\u001a\u00020\u0003H\u0002J\u0010\u0010.\u001a\u00020&2\u0006\u0010/\u001a\u000200H\u0002J\u0006\u00101\u001a\u00020&J\u0006\u00102\u001a\u00020&J\u0016\u00103\u001a\u00020&2\f\u00104\u001a\b\u0012\u0004\u0012\u00020&0+H\u0002R\u0011\u0010\u0013\u001a\u00020\u0014¢\u0006\b\n��\u001a\u0004\b\u0015\u0010\u0016R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000e\u001a\u00020\u000fX\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0010\u001a\u0004\u0018\u00010\u0011X\u0082\u0004¢\u0006\u0002\n��R\u0010\u0010\u0017\u001a\u0004\u0018\u00010\u0018X\u0082\u000e¢\u0006\u0002\n��R\u0010\u0010\u0019\u001a\u0004\u0018\u00010\u001aX\u0082\u000e¢\u0006\u0002\n��R\u0017\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\t0\b¢\u0006\b\n��\u001a\u0004\b\u001b\u0010\u001cR\u000e\u0010\u001d\u001a\u00020\u001eX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\u001f\u0010 R\u0010\u0010!\u001a\u0004\u0018\u00010\"X\u0082\u000e¢\u0006\u0002\n��R\u0017\u0010\u0004\u001a\b\u0012\u0004\u0012\u00020\u00060\u0005¢\u0006\b\n��\u001a\u0004\b#\u0010$¨\u00066"}, d2 = {"Lnet/corda/nodeapi/internal/bridging/AMQPBridgeManager$AMQPBridge;", "", "queueName", "", "targets", "", "Lnet/corda/core/utilities/NetworkHostAndPort;", "legalNames", "", "Lnet/corda/core/identity/CordaX500Name;", "amqpConfig", "Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration;", "sharedEventGroup", "Lio/netty/channel/EventLoopGroup;", "artemis", "Lnet/corda/nodeapi/internal/ArtemisSessionProvider;", "bridgeMetricsService", "Lnet/corda/nodeapi/internal/bridging/BridgeMetricsService;", "(Ljava/lang/String;Ljava/util/List;Ljava/util/Set;Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration;Lio/netty/channel/EventLoopGroup;Lnet/corda/nodeapi/internal/ArtemisSessionProvider;Lnet/corda/nodeapi/internal/bridging/BridgeMetricsService;)V", "amqpClient", "Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPClient;", "getAmqpClient", "()Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPClient;", "connectedSubscription", "Lrx/Subscription;", "consumer", "Lorg/apache/activemq/artemis/api/core/client/ClientConsumer;", "getLegalNames", "()Ljava/util/Set;", "lock", "Ljava/util/concurrent/locks/ReentrantLock;", "getQueueName", "()Ljava/lang/String;", "session", "Lorg/apache/activemq/artemis/api/core/client/ClientSession;", "getTargets", "()Ljava/util/List;", "clientArtemisMessageHandler", "", "artemisMessage", "Lorg/apache/activemq/artemis/api/core/client/ClientMessage;", "logDebugWithMDC", "msg", "Lkotlin/Function0;", "logInfoWithMDC", "logWarnWithMDC", "onSocketConnected", "connected", "", "start", "stop", "withMDC", "block", "Companion", "node-api"})
    /* loaded from: input_file:net/corda/nodeapi/internal/bridging/AMQPBridgeManager$AMQPBridge.class */
    public static final class AMQPBridge {

        @NotNull
        private final AMQPClient amqpClient;
        private final ReentrantLock lock;
        private ClientSession session;
        private ClientConsumer consumer;
        private Subscription connectedSubscription;

        @NotNull
        private final String queueName;

        @NotNull
        private final List<NetworkHostAndPort> targets;

        @NotNull
        private final Set<CordaX500Name> legalNames;
        private final AMQPConfiguration amqpConfig;
        private final ArtemisSessionProvider artemis;
        private final BridgeMetricsService bridgeMetricsService;
        public static final Companion Companion = new Companion(null);
        private static final Logger log = KotlinUtilsKt.contextLogger(Companion);

        /* compiled from: AMQPBridgeManager.kt */
        @Metadata(mv = {1, 1, 11}, bv = {1, AMQPBridgeManager.NUM_BRIDGE_THREADS, AMQPClient.NUM_CLIENT_THREADS}, k = 1, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lnet/corda/nodeapi/internal/bridging/AMQPBridgeManager$AMQPBridge$Companion;", "", "()V", "log", "Lorg/slf4j/Logger;", "node-api"})
        /* loaded from: input_file:net/corda/nodeapi/internal/bridging/AMQPBridgeManager$AMQPBridge$Companion.class */
        public static final class Companion {
            private Companion() {
            }

            public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
                this();
            }
        }

        private final void withMDC(Function0<Unit> function0) {
            Map copyOfContextMap = MDC.getCopyOfContextMap();
            try {
                MDC.put("queueName", this.queueName);
                MDC.put("targets", CollectionsKt.joinToString$default(this.targets, ";", (CharSequence) null, (CharSequence) null, AMQPBridgeManager.NUM_BRIDGE_THREADS, (CharSequence) null, new Function1<NetworkHostAndPort, String>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$withMDC$1
                    @NotNull
                    public final String invoke(@NotNull NetworkHostAndPort networkHostAndPort) {
                        Intrinsics.checkParameterIsNotNull(networkHostAndPort, "it");
                        return networkHostAndPort.toString();
                    }
                }, 30, (Object) null));
                MDC.put("legalNames", CollectionsKt.joinToString$default(this.legalNames, ";", (CharSequence) null, (CharSequence) null, AMQPBridgeManager.NUM_BRIDGE_THREADS, (CharSequence) null, new Function1<CordaX500Name, String>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$withMDC$2
                    @NotNull
                    public final String invoke(@NotNull CordaX500Name cordaX500Name) {
                        Intrinsics.checkParameterIsNotNull(cordaX500Name, "it");
                        return cordaX500Name.toString();
                    }
                }, 30, (Object) null));
                MDC.put("maxMessageSize", String.valueOf(this.amqpConfig.getMaxMessageSize()));
                function0.invoke();
                MDC.setContextMap(copyOfContextMap);
            } catch (Throwable th) {
                MDC.setContextMap(copyOfContextMap);
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void logDebugWithMDC(final Function0<String> function0) {
            if (log.isDebugEnabled()) {
                withMDC(new Function0<Unit>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$logDebugWithMDC$1
                    public /* bridge */ /* synthetic */ Object invoke() {
                        m36invoke();
                        return Unit.INSTANCE;
                    }

                    /* renamed from: invoke, reason: collision with other method in class */
                    public final void m36invoke() {
                        Logger logger;
                        logger = AMQPBridgeManager.AMQPBridge.log;
                        logger.debug((String) function0.invoke());
                    }

                    /* JADX INFO: Access modifiers changed from: package-private */
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(0);
                    }
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void logInfoWithMDC(final String str) {
            withMDC(new Function0<Unit>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$logInfoWithMDC$1
                public /* bridge */ /* synthetic */ Object invoke() {
                    m37invoke();
                    return Unit.INSTANCE;
                }

                /* renamed from: invoke, reason: collision with other method in class */
                public final void m37invoke() {
                    Logger logger;
                    logger = AMQPBridgeManager.AMQPBridge.log;
                    logger.info(str);
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }
            });
        }

        private final void logWarnWithMDC(final String str) {
            withMDC(new Function0<Unit>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$logWarnWithMDC$1
                public /* bridge */ /* synthetic */ Object invoke() {
                    m38invoke();
                    return Unit.INSTANCE;
                }

                /* renamed from: invoke, reason: collision with other method in class */
                public final void m38invoke() {
                    Logger logger;
                    logger = AMQPBridgeManager.AMQPBridge.log;
                    logger.warn(str);
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }
            });
        }

        @NotNull
        public final AMQPClient getAmqpClient() {
            return this.amqpClient;
        }

        public final void start() {
            logInfoWithMDC("Create new AMQP bridge");
            this.connectedSubscription = this.amqpClient.getOnConnection().subscribe(new Action1<ConnectionChange>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$start$1
                public final void call(ConnectionChange connectionChange) {
                    AMQPBridgeManager.AMQPBridge.this.onSocketConnected(connectionChange.getConnected());
                }
            });
            this.amqpClient.start();
        }

        public final void stop() {
            logInfoWithMDC("Stopping AMQP bridge");
            ReentrantLock reentrantLock = this.lock;
            reentrantLock.lock();
            try {
                synchronized (this.artemis) {
                    ClientConsumer clientConsumer = this.consumer;
                    if (clientConsumer != null) {
                        clientConsumer.close();
                    }
                    this.consumer = (ClientConsumer) null;
                    ClientSession clientSession = this.session;
                    if (clientSession != null) {
                        clientSession.stop();
                    }
                    this.session = (ClientSession) null;
                    Unit unit = Unit.INSTANCE;
                }
                Unit unit2 = Unit.INSTANCE;
                reentrantLock.unlock();
                this.amqpClient.stop();
                Subscription subscription = this.connectedSubscription;
                if (subscription != null) {
                    subscription.unsubscribe();
                }
                this.connectedSubscription = (Subscription) null;
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void onSocketConnected(boolean z) {
            ClientSession clientSession;
            ReentrantLock reentrantLock = this.lock;
            reentrantLock.lock();
            try {
                synchronized (this.artemis) {
                    if (z) {
                        logInfoWithMDC("Bridge Connected");
                        BridgeMetricsService bridgeMetricsService = this.bridgeMetricsService;
                        if (bridgeMetricsService != null) {
                            bridgeMetricsService.bridgeConnected(this.targets, this.legalNames);
                        }
                        ArtemisMessagingClient.Started started = this.artemis.getStarted();
                        if (started == null) {
                            Intrinsics.throwNpe();
                        }
                        ClientSession createSession = started.getSessionFactory().createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, 1048576);
                        this.session = createSession;
                        ClientConsumer createConsumer = createSession.createConsumer(this.queueName);
                        this.consumer = createConsumer;
                        final AMQPBridgeManager$AMQPBridge$onSocketConnected$1$1$1 aMQPBridgeManager$AMQPBridge$onSocketConnected$1$1$1 = new AMQPBridgeManager$AMQPBridge$onSocketConnected$1$1$1(this);
                        createConsumer.setMessageHandler(new MessageHandler() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$sam$i$org_apache_activemq_artemis_api_core_client_MessageHandler$0
                            public final /* synthetic */ void onMessage(ClientMessage clientMessage) {
                                Intrinsics.checkExpressionValueIsNotNull(aMQPBridgeManager$AMQPBridge$onSocketConnected$1$1$1.invoke(clientMessage), "invoke(...)");
                            }
                        });
                        clientSession = createSession.start();
                    } else {
                        logInfoWithMDC("Bridge Disconnected");
                        BridgeMetricsService bridgeMetricsService2 = this.bridgeMetricsService;
                        if (bridgeMetricsService2 != null) {
                            bridgeMetricsService2.bridgeDisconnected(this.targets, this.legalNames);
                        }
                        ClientConsumer clientConsumer = this.consumer;
                        if (clientConsumer != null) {
                            clientConsumer.close();
                        }
                        this.consumer = (ClientConsumer) null;
                        ClientSession clientSession2 = this.session;
                        if (clientSession2 != null) {
                            clientSession2.stop();
                        }
                        this.session = (ClientSession) null;
                        clientSession = Unit.INSTANCE;
                    }
                }
            } finally {
                reentrantLock.unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final void clientArtemisMessageHandler(final ClientMessage clientMessage) {
            Unit unit;
            if (clientMessage.getBodySize() > this.amqpConfig.getMaxMessageSize()) {
                String str = "Message exceeds maxMessageSize network parameter, maxMessageSize: [" + this.amqpConfig.getMaxMessageSize() + "], message size: [" + clientMessage.getBodySize() + "], dropping message, uuid: " + clientMessage.getObjectProperty("_AMQ_DUPL_ID");
                logWarnWithMDC(str);
                BridgeMetricsService bridgeMetricsService = this.bridgeMetricsService;
                if (bridgeMetricsService != null) {
                    bridgeMetricsService.packetDropEvent(clientMessage, str);
                }
                clientMessage.individualAcknowledge();
                return;
            }
            byte[] bArr = new byte[clientMessage.getBodySize()];
            clientMessage.getBodyBuffer().readBytes(bArr);
            HashMap hashMap = new HashMap();
            for (String str2 : ArtemisMessagingComponent.Companion.P2PMessagingHeaders.INSTANCE.getWhitelistedHeaders()) {
                if (clientMessage.containsProperty(str2)) {
                    Object objectProperty = clientMessage.getObjectProperty(str2);
                    if (objectProperty instanceof SimpleString) {
                        objectProperty = objectProperty.toString();
                    }
                    hashMap.put(str2, objectProperty);
                }
            }
            logDebugWithMDC(new Function0<String>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$clientArtemisMessageHandler$1
                @NotNull
                public final String invoke() {
                    return "Bridged Send to " + ((CordaX500Name) CollectionsKt.first(AMQPBridgeManager.AMQPBridge.this.getLegalNames())) + " uuid: " + clientMessage.getObjectProperty("_AMQ_DUPL_ID");
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }
            });
            final SendableMessage createMessage = this.amqpClient.createMessage(bArr, ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress(this.queueName), ((CordaX500Name) CollectionsKt.first(this.legalNames)).toString(), hashMap);
            createMessage.getOnComplete().then(new Function1<CordaFuture<MessageStatus>, Object>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$clientArtemisMessageHandler$2
                @Nullable
                public final Object invoke(@NotNull CordaFuture<MessageStatus> cordaFuture) {
                    ReentrantLock reentrantLock;
                    ClientSession clientSession;
                    ClientSession clientSession2;
                    ClientMessage clientMessage2;
                    Intrinsics.checkParameterIsNotNull(cordaFuture, "it");
                    AMQPBridgeManager.AMQPBridge.this.logDebugWithMDC(new Function0<String>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager$AMQPBridge$clientArtemisMessageHandler$2.1
                        @NotNull
                        public final String invoke() {
                            return "Bridge ACK " + ((MessageStatus) createMessage.getOnComplete().get());
                        }

                        {
                            super(0);
                        }
                    });
                    reentrantLock = AMQPBridgeManager.AMQPBridge.this.lock;
                    ReentrantLock reentrantLock2 = reentrantLock;
                    reentrantLock2.lock();
                    try {
                        if (((MessageStatus) createMessage.getOnComplete().get()) == MessageStatus.Acknowledged) {
                            clientMessage2 = clientMessage.individualAcknowledge();
                        } else {
                            AMQPBridgeManager.AMQPBridge.this.logInfoWithMDC("Rollback rejected message uuid: " + clientMessage.getObjectProperty("_AMQ_DUPL_ID"));
                            clientSession = AMQPBridgeManager.AMQPBridge.this.session;
                            if (clientSession != null) {
                                clientSession.commit();
                            }
                            clientSession2 = AMQPBridgeManager.AMQPBridge.this.session;
                            if (clientSession2 != null) {
                                clientSession2.rollback(false);
                                clientMessage2 = Unit.INSTANCE;
                            } else {
                                clientMessage2 = null;
                            }
                        }
                        return clientMessage2;
                    } finally {
                        reentrantLock2.unlock();
                    }
                }

                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(1);
                }
            });
            try {
                this.amqpClient.write(createMessage);
            } catch (IllegalStateException e) {
                ReentrantLock reentrantLock = this.lock;
                reentrantLock.lock();
                try {
                    String message = e.getMessage();
                    if (message != null) {
                        logInfoWithMDC(message);
                    }
                    logInfoWithMDC("Rollback rejected message uuid: " + clientMessage.getObjectProperty("_AMQ_DUPL_ID"));
                    ClientSession clientSession = this.session;
                    if (clientSession != null) {
                        clientSession.commit();
                    }
                    ClientSession clientSession2 = this.session;
                    if (clientSession2 != null) {
                        clientSession2.rollback(false);
                        unit = Unit.INSTANCE;
                    } else {
                        unit = null;
                    }
                } finally {
                    reentrantLock.unlock();
                }
            }
            BridgeMetricsService bridgeMetricsService2 = this.bridgeMetricsService;
            if (bridgeMetricsService2 != null) {
                bridgeMetricsService2.packetAcceptedEvent(createMessage);
            }
        }

        @NotNull
        public final String getQueueName() {
            return this.queueName;
        }

        @NotNull
        public final List<NetworkHostAndPort> getTargets() {
            return this.targets;
        }

        @NotNull
        public final Set<CordaX500Name> getLegalNames() {
            return this.legalNames;
        }

        public AMQPBridge(@NotNull String str, @NotNull List<NetworkHostAndPort> list, @NotNull Set<CordaX500Name> set, @NotNull AMQPConfiguration aMQPConfiguration, @NotNull EventLoopGroup eventLoopGroup, @NotNull ArtemisSessionProvider artemisSessionProvider, @Nullable BridgeMetricsService bridgeMetricsService) {
            Intrinsics.checkParameterIsNotNull(str, "queueName");
            Intrinsics.checkParameterIsNotNull(list, "targets");
            Intrinsics.checkParameterIsNotNull(set, "legalNames");
            Intrinsics.checkParameterIsNotNull(aMQPConfiguration, "amqpConfig");
            Intrinsics.checkParameterIsNotNull(eventLoopGroup, "sharedEventGroup");
            Intrinsics.checkParameterIsNotNull(artemisSessionProvider, "artemis");
            this.queueName = str;
            this.targets = list;
            this.legalNames = set;
            this.amqpConfig = aMQPConfiguration;
            this.artemis = artemisSessionProvider;
            this.bridgeMetricsService = bridgeMetricsService;
            this.amqpClient = new AMQPClient(this.targets, this.legalNames, this.amqpConfig, eventLoopGroup);
            this.lock = new ReentrantLock();
        }
    }

    /* compiled from: AMQPBridgeManager.kt */
    @Metadata(mv = {1, 1, 11}, bv = {1, AMQPBridgeManager.NUM_BRIDGE_THREADS, AMQPClient.NUM_CLIENT_THREADS}, k = 1, d1 = {"��&\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n��\n\u0002\u0010\u000b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\n\b\u0002\u0018��2\u00020\u0001B\u001f\b\u0016\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007¢\u0006\u0002\u0010\bB'\b\u0002\u0012\u0006\u0010\t\u001a\u00020\n\u0012\u0006\u0010\u000b\u001a\u00020\n\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007¢\u0006\u0002\u0010\fR\u0014\u0010\u0006\u001a\u00020\u0007X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\r\u0010\u000eR\u0014\u0010\t\u001a\u00020\nX\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u000f\u0010\u0010R\u0014\u0010\u0004\u001a\u00020\u0005X\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0011\u0010\u0012R\u0014\u0010\u000b\u001a\u00020\nX\u0096\u0004¢\u0006\b\n��\u001a\u0004\b\u0013\u0010\u0010¨\u0006\u0014"}, d2 = {"Lnet/corda/nodeapi/internal/bridging/AMQPBridgeManager$AMQPConfigurationImpl;", "Lnet/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration;", "config", "Lnet/corda/nodeapi/internal/config/MutualSslConfiguration;", "maxMessageSize", "", "crlCheckSoftFail", "", "(Lnet/corda/nodeapi/internal/config/MutualSslConfiguration;IZ)V", "keyStore", "Lnet/corda/nodeapi/internal/config/CertificateStore;", "trustStore", "(Lnet/corda/nodeapi/internal/config/CertificateStore;Lnet/corda/nodeapi/internal/config/CertificateStore;IZ)V", "getCrlCheckSoftFail", "()Z", "getKeyStore", "()Lnet/corda/nodeapi/internal/config/CertificateStore;", "getMaxMessageSize", "()I", "getTrustStore", "node-api"})
    /* loaded from: input_file:net/corda/nodeapi/internal/bridging/AMQPBridgeManager$AMQPConfigurationImpl.class */
    private static final class AMQPConfigurationImpl implements AMQPConfiguration {

        @NotNull
        private final CertificateStore keyStore;

        @NotNull
        private final CertificateStore trustStore;
        private final int maxMessageSize;
        private final boolean crlCheckSoftFail;

        @Override // net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
        @NotNull
        public CertificateStore getKeyStore() {
            return this.keyStore;
        }

        @Override // net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
        @NotNull
        public CertificateStore getTrustStore() {
            return this.trustStore;
        }

        @Override // net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
        public int getMaxMessageSize() {
            return this.maxMessageSize;
        }

        @Override // net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
        public boolean getCrlCheckSoftFail() {
            return this.crlCheckSoftFail;
        }

        private AMQPConfigurationImpl(CertificateStore certificateStore, CertificateStore certificateStore2, int i, boolean z) {
            this.keyStore = certificateStore;
            this.trustStore = certificateStore2;
            this.maxMessageSize = i;
            this.crlCheckSoftFail = z;
        }

        /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
        public AMQPConfigurationImpl(@NotNull MutualSslConfiguration mutualSslConfiguration, int i, boolean z) {
            this(CertificateStoreSupplier.DefaultImpls.get$default(mutualSslConfiguration.getKeyStore(), false, 1, null), CertificateStoreSupplier.DefaultImpls.get$default(mutualSslConfiguration.getTrustStore(), false, 1, null), i, z);
            Intrinsics.checkParameterIsNotNull(mutualSslConfiguration, "config");
        }
    }

    /* compiled from: AMQPBridgeManager.kt */
    @Metadata(mv = {1, 1, 11}, bv = {1, AMQPBridgeManager.NUM_BRIDGE_THREADS, AMQPClient.NUM_CLIENT_THREADS}, k = 1, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\b\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lnet/corda/nodeapi/internal/bridging/AMQPBridgeManager$Companion;", "", "()V", "NUM_BRIDGE_THREADS", "", "node-api"})
    /* loaded from: input_file:net/corda/nodeapi/internal/bridging/AMQPBridgeManager$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @Override // net.corda.nodeapi.internal.bridging.BridgeManager
    public void deployBridge(@NotNull String str, @NotNull List<NetworkHostAndPort> list, @NotNull Set<CordaX500Name> set) {
        List<AMQPBridge> list2;
        boolean z;
        Intrinsics.checkParameterIsNotNull(str, "queueName");
        Intrinsics.checkParameterIsNotNull(list, "targets");
        Intrinsics.checkParameterIsNotNull(set, "legalNames");
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            Map<String, List<AMQPBridge>> map = this.queueNamesToBridgesMap;
            List<AMQPBridge> list3 = map.get(str);
            if (list3 == null) {
                ArrayList arrayList = new ArrayList();
                map.put(str, arrayList);
                list2 = arrayList;
            } else {
                list2 = list3;
            }
            List<AMQPBridge> list4 = list2;
            for (NetworkHostAndPort networkHostAndPort : list) {
                List<AMQPBridge> list5 = list4;
                if (!(list5 instanceof Collection) || !list5.isEmpty()) {
                    Iterator<T> it = list5.iterator();
                    while (true) {
                        if (it.hasNext()) {
                            if (((AMQPBridge) it.next()).getTargets().contains(networkHostAndPort)) {
                                z = true;
                                break;
                            }
                        } else {
                            z = false;
                            break;
                        }
                    }
                } else {
                    z = false;
                }
                if (z) {
                    return;
                }
            }
            AMQPConfiguration aMQPConfiguration = this.amqpConfig;
            EventLoopGroup eventLoopGroup = this.sharedEventLoopGroup;
            if (eventLoopGroup == null) {
                Intrinsics.throwNpe();
            }
            ArtemisSessionProvider artemisSessionProvider = this.artemis;
            if (artemisSessionProvider == null) {
                Intrinsics.throwNpe();
            }
            AMQPBridge aMQPBridge = new AMQPBridge(str, list, set, aMQPConfiguration, eventLoopGroup, artemisSessionProvider, this.bridgeMetricsService);
            list4.add(aMQPBridge);
            BridgeMetricsService bridgeMetricsService = this.bridgeMetricsService;
            if (bridgeMetricsService != null) {
                bridgeMetricsService.bridgeCreated(list, set);
            }
            reentrantLock.unlock();
            aMQPBridge.start();
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // net.corda.nodeapi.internal.bridging.BridgeManager
    public void destroyBridge(@NotNull String str, @NotNull List<NetworkHostAndPort> list) {
        Object obj;
        Intrinsics.checkParameterIsNotNull(str, "queueName");
        Intrinsics.checkParameterIsNotNull(list, "targets");
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            ArrayList arrayList = this.queueNamesToBridgesMap.get(str);
            if (arrayList == null) {
                arrayList = new ArrayList();
            }
            List<AMQPBridge> list2 = arrayList;
            for (NetworkHostAndPort networkHostAndPort : list) {
                Iterator<T> it = list2.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        obj = null;
                        break;
                    }
                    Object next = it.next();
                    if (((AMQPBridge) next).getTargets().contains(networkHostAndPort)) {
                        obj = next;
                        break;
                    }
                }
                AMQPBridge aMQPBridge = (AMQPBridge) obj;
                if (aMQPBridge != null) {
                    list2.remove(aMQPBridge);
                    if (list2.isEmpty()) {
                        this.queueNamesToBridgesMap.remove(str);
                    }
                    aMQPBridge.stop();
                    BridgeMetricsService bridgeMetricsService = this.bridgeMetricsService;
                    if (bridgeMetricsService != null) {
                        bridgeMetricsService.bridgeDestroyed(aMQPBridge.getTargets(), aMQPBridge.getLegalNames());
                    }
                }
            }
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // net.corda.nodeapi.internal.bridging.BridgeManager
    public void start() {
        this.sharedEventLoopGroup = new NioEventLoopGroup(NUM_BRIDGE_THREADS);
        ArtemisSessionProvider artemisSessionProvider = (ArtemisSessionProvider) this.artemisMessageClientFactory.invoke();
        this.artemis = artemisSessionProvider;
        artemisSessionProvider.start();
    }

    @Override // net.corda.nodeapi.internal.bridging.BridgeManager
    public void stop() {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Unit unit;
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            Iterator it = CollectionsKt.flatten(this.queueNamesToBridgesMap.values()).iterator();
            while (it.hasNext()) {
                ((AMQPBridge) it.next()).stop();
            }
            EventLoopGroup eventLoopGroup = this.sharedEventLoopGroup;
            if (eventLoopGroup != null) {
                eventLoopGroup.shutdownGracefully();
            }
            EventLoopGroup eventLoopGroup2 = this.sharedEventLoopGroup;
            if (eventLoopGroup2 != null) {
                Future terminationFuture = eventLoopGroup2.terminationFuture();
                if (terminationFuture != null) {
                    terminationFuture.sync();
                }
            }
            this.sharedEventLoopGroup = (EventLoopGroup) null;
            this.queueNamesToBridgesMap.clear();
            ArtemisSessionProvider artemisSessionProvider = this.artemis;
            if (artemisSessionProvider != null) {
                artemisSessionProvider.stop();
                unit = Unit.INSTANCE;
            } else {
                unit = null;
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public AMQPBridgeManager(@NotNull MutualSslConfiguration mutualSslConfiguration, int i, boolean z, @NotNull Function0<? extends ArtemisSessionProvider> function0, @Nullable BridgeMetricsService bridgeMetricsService) {
        Intrinsics.checkParameterIsNotNull(mutualSslConfiguration, "config");
        Intrinsics.checkParameterIsNotNull(function0, "artemisMessageClientFactory");
        this.artemisMessageClientFactory = function0;
        this.bridgeMetricsService = bridgeMetricsService;
        this.lock = new ReentrantLock();
        this.queueNamesToBridgesMap = new LinkedHashMap();
        this.amqpConfig = new AMQPConfigurationImpl(mutualSslConfiguration, i, z);
    }

    public /* synthetic */ AMQPBridgeManager(MutualSslConfiguration mutualSslConfiguration, int i, boolean z, Function0 function0, BridgeMetricsService bridgeMetricsService, int i2, DefaultConstructorMarker defaultConstructorMarker) {
        this(mutualSslConfiguration, i, z, function0, (i2 & 16) != 0 ? (BridgeMetricsService) null : bridgeMetricsService);
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    public AMQPBridgeManager(@NotNull final MutualSslConfiguration mutualSslConfiguration, @NotNull final NetworkHostAndPort networkHostAndPort, final int i, boolean z) {
        this(mutualSslConfiguration, i, z, new Function0<ArtemisMessagingClient>() { // from class: net.corda.nodeapi.internal.bridging.AMQPBridgeManager.1
            @NotNull
            public final ArtemisMessagingClient invoke() {
                return new ArtemisMessagingClient(MutualSslConfiguration.this, networkHostAndPort, i);
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(AMQPBridgeManager.NUM_BRIDGE_THREADS);
            }
        }, null, 16, null);
        Intrinsics.checkParameterIsNotNull(mutualSslConfiguration, "config");
        Intrinsics.checkParameterIsNotNull(networkHostAndPort, "p2pAddress");
    }
}
