package net.corda.node.services.messaging;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import kotlin.Metadata;
import kotlin.collections.ArraysKt;
import kotlin.collections.CollectionsKt;
import kotlin.collections.MapsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import net.corda.core.ErrorOr;
import net.corda.core.serialization.KryoKt;
import net.corda.core.serialization.SerializedBytes;
import net.corda.node.services.RPCUserService;
import net.corda.node.services.User;
import net.corda.node.services.messaging.RPCDispatcher;
import net.corda.node.utilities.AffinityExecutor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import rx.Notification;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;

/* compiled from: RPCDispatcher.kt */
@Metadata(mv = {1, 1, 1}, bv = {1, 0, 0}, k = 1, d1 = {"��p\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010$\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\u000b\n\u0002\b\u0005\b&\u0018��2\u00020\u0001:\u0001-B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\u000e\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0015\u001a\u00020\u0016J\u0010\u0010\u0017\u001a\u00020\u00182\u0006\u0010\u0019\u001a\u00020\u001aH\u0015J\u001c\u0010\u001b\u001a\u00020\u00142\n\u0010\u001c\u001a\u0006\u0012\u0002\b\u00030\u001d2\u0006\u0010\u001e\u001a\u00020\tH&J \u0010\u001f\u001a\u00020\u00142\u0006\u0010 \u001a\u00020!2\b\u0010\"\u001a\u0004\u0018\u00010!2\u0006\u0010#\u001a\u00020$J&\u0010%\u001a\u0004\u0018\u00010\t*\u00020\u001a2\u0006\u0010&\u001a\u00020\u00182\u0006\u0010'\u001a\u00020\t2\u0006\u0010(\u001a\u00020)H\u0002J\u0014\u0010*\u001a\u00020\t*\u00020\u001a2\u0006\u0010+\u001a\u00020\tH\u0002J\f\u0010,\u001a\u00020\u0016*\u00020\u001aH\u0002R*\u0010\u0007\u001a\u001e\u0012\f\u0012\n \n*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \n*\u0004\u0018\u00010\u000b0\u000b0\bX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\f\u0010\rRN\u0010\u000e\u001aB\u0012\f\u0012\n \n*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \n*\u0004\u0018\u00010\u00100\u0010 \n* \u0012\f\u0012\n \n*\u0004\u0018\u00010\t0\t\u0012\f\u0012\n \n*\u0004\u0018\u00010\u00100\u0010\u0018\u00010\u000f0\u000fX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u0004\u001a\u00020\u0005¢\u0006\b\n��\u001a\u0004\b\u0011\u0010\u0012¨\u0006."}, d2 = {"Lnet/corda/node/services/messaging/RPCDispatcher;", "", "ops", "Lnet/corda/node/services/messaging/RPCOps;", "userService", "Lnet/corda/node/services/RPCUserService;", "(Lnet/corda/node/services/messaging/RPCOps;Lnet/corda/node/services/RPCUserService;)V", "methodTable", "", "", "kotlin.jvm.PlatformType", "Ljava/lang/reflect/Method;", "getOps", "()Lnet/corda/node/services/messaging/RPCOps;", "queueToSubscription", "Lcom/google/common/collect/HashMultimap;", "Lrx/Subscription;", "getUserService", "()Lnet/corda/node/services/RPCUserService;", "dispatch", "", "msg", "Lnet/corda/node/services/messaging/ClientRPCRequestMessage;", "getUser", "Lnet/corda/node/services/User;", "message", "Lorg/apache/activemq/artemis/api/core/client/ClientMessage;", "send", "data", "Lnet/corda/core/serialization/SerializedBytes;", "toAddress", "start", "rpcConsumer", "Lorg/apache/activemq/artemis/api/core/client/ClientConsumer;", "rpcNotificationConsumer", "onExecutor", "Lnet/corda/node/utilities/AffinityExecutor;", "getAuthenticatedAddress", "user", "property", "required", "", "requiredString", "name", "toRPCRequestMessage", "ObservableSerializer", "node_main"})
/* loaded from: input_file:net/corda/node/services/messaging/RPCDispatcher.class */
public abstract class RPCDispatcher {
    private final Map<String, Method> methodTable;
    private final HashMultimap<String, Subscription> queueToSubscription;

    @NotNull
    private final RPCOps ops;

    @NotNull
    private final RPCUserService userService;

    /* compiled from: RPCDispatcher.kt */
    @Metadata(mv = {1, 1, 1}, bv = {1, 0, 0}, k = 1, d1 = {"��B\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0082\u0004\u0018��2\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\u00020\u0001B\r\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J2\u0010\t\u001a\b\u0012\u0004\u0012\u00020\u00030\u00022\u0006\u0010\n\u001a\u00020\u000b2\u0006\u0010\f\u001a\u00020\r2\u0012\u0010\u000e\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\u00020\u000fH\u0016J&\u0010\u0010\u001a\u00020\u00112\u0006\u0010\n\u001a\u00020\u000b2\u0006\u0010\u0012\u001a\u00020\u00132\f\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00030\u0002H\u0016R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0015"}, d2 = {"Lnet/corda/node/services/messaging/RPCDispatcher$ObservableSerializer;", "Lcom/esotericsoftware/kryo/Serializer;", "Lrx/Observable;", "", "toQName", "", "(Lnet/corda/node/services/messaging/RPCDispatcher;Ljava/lang/String;)V", "handleCounter", "Ljava/util/concurrent/atomic/AtomicInteger;", "read", "kryo", "Lcom/esotericsoftware/kryo/Kryo;", "input", "Lcom/esotericsoftware/kryo/io/Input;", "type", "Ljava/lang/Class;", "write", "", "output", "Lcom/esotericsoftware/kryo/io/Output;", "obj", "node_main"})
    /* loaded from: input_file:net/corda/node/services/messaging/RPCDispatcher$ObservableSerializer.class */
    private final class ObservableSerializer extends Serializer<Observable<Object>> {
        private final AtomicInteger handleCounter;
        private final String toQName;
        final /* synthetic */ RPCDispatcher this$0;

        @NotNull
        public Observable<Object> read(@NotNull Kryo kryo, @NotNull Input input, @NotNull Class<Observable<Object>> cls) {
            Intrinsics.checkParameterIsNotNull(kryo, "kryo");
            Intrinsics.checkParameterIsNotNull(input, "input");
            Intrinsics.checkParameterIsNotNull(cls, "type");
            throw new UnsupportedOperationException("not implemented");
        }

        /* renamed from: read, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m61read(Kryo kryo, Input input, Class cls) {
            return read(kryo, input, (Class<Observable<Object>>) cls);
        }

        public void write(@NotNull Kryo kryo, @NotNull Output output, @NotNull Observable<Object> observable) {
            Intrinsics.checkParameterIsNotNull(kryo, "kryo");
            Intrinsics.checkParameterIsNotNull(output, "output");
            Intrinsics.checkParameterIsNotNull(observable, "obj");
            final int andIncrement = this.handleCounter.getAndIncrement();
            output.writeInt(andIncrement, true);
            Subscription subscribe = observable.materialize().subscribe(new Action1<Notification<Object>>() { // from class: net.corda.node.services.messaging.RPCDispatcher$ObservableSerializer$write$subscription$1
                public final void call(@NotNull Notification<? extends Object> notification) {
                    String str;
                    Intrinsics.checkParameterIsNotNull(notification, "materialised");
                    SerializedBytes<?> serialize = KryoKt.serialize(new MarshalledObservation(andIncrement, notification), RPCStructures.createRPCKryo(RPCDispatcher.ObservableSerializer.this));
                    RPCStructures.getRpcLog().debug("RPC sending observation: " + notification);
                    RPCDispatcher rPCDispatcher = RPCDispatcher.ObservableSerializer.this.this$0;
                    str = RPCDispatcher.ObservableSerializer.this.toQName;
                    rPCDispatcher.send(serialize, str);
                }
            });
            HashMultimap hashMultimap = this.this$0.queueToSubscription;
            Intrinsics.checkExpressionValueIsNotNull(hashMultimap, "queueToSubscription");
            synchronized (hashMultimap) {
                this.this$0.queueToSubscription.put(this.toQName, subscribe);
            }
        }

        public ObservableSerializer(@NotNull RPCDispatcher rPCDispatcher, String str) {
            Intrinsics.checkParameterIsNotNull(str, "toQName");
            this.this$0 = rPCDispatcher;
            this.toQName = str;
            this.handleCounter = new AtomicInteger();
        }
    }

    public final void dispatch(@NotNull ClientRPCRequestMessage clientRPCRequestMessage) {
        ErrorOr of;
        SerializedBytes<?> serialize;
        Method method;
        Intrinsics.checkParameterIsNotNull(clientRPCRequestMessage, "msg");
        SerializedBytes<Object[]> component1 = clientRPCRequestMessage.component1();
        String component2 = clientRPCRequestMessage.component2();
        String component3 = clientRPCRequestMessage.component3();
        String component4 = clientRPCRequestMessage.component4();
        ErrorOr.Companion companion = ErrorOr.Companion;
        try {
            method = this.methodTable.get(component4);
        } catch (Throwable th) {
            of = ErrorOr.Companion.of(th);
        }
        if (method == null) {
            throw new RPCException("Received RPC for unknown method " + component4 + " - possible client/server version skew?");
        }
        if (method.isAnnotationPresent(RPCReturnsObservables.class) && component3 == null) {
            throw new RPCException("Received RPC without any destination for observations, but the RPC returns observables");
        }
        Object[] objArr = (Object[]) KryoKt.deserialize$default(component1, (Kryo) null, 1, (Object) null);
        Logger rpcLog = RPCStructures.getRpcLog();
        if (rpcLog.isDebugEnabled()) {
            rpcLog.debug("-> RPC -> " + component4 + "(" + ArraysKt.joinToString$default(objArr, (CharSequence) null, (CharSequence) null, (CharSequence) null, 0, (CharSequence) null, (Function1) null, 63, (Object) null) + ")    [reply to " + component2 + "]");
        }
        try {
            of = new ErrorOr(method.invoke(this.ops, Arrays.copyOf(objArr, objArr.length)));
            ErrorOr errorOr = of;
            Logger rpcLog2 = RPCStructures.getRpcLog();
            if (rpcLog2.isDebugEnabled()) {
                rpcLog2.debug("<- RPC <- " + component4 + " = " + errorOr + " ");
            }
            Kryo createRPCKryo = RPCStructures.createRPCKryo(component3 != null ? new ObservableSerializer(this, component3) : (ObservableSerializer) null);
            try {
                serialize = KryoKt.serialize(errorOr, createRPCKryo);
            } catch (KryoException e) {
                RPCStructures.getRpcLog().error("Failed to respond to inbound RPC " + component4, e);
                serialize = KryoKt.serialize(ErrorOr.Companion.of(e), createRPCKryo);
            }
            send(serialize, component2);
        } catch (InvocationTargetException e2) {
            Throwable cause = e2.getCause();
            if (cause != null) {
                throw cause;
            }
            Intrinsics.throwNpe();
            throw cause;
        }
    }

    public abstract void send(@NotNull SerializedBytes<?> serializedBytes, @NotNull String str);

    public final void start(@NotNull ClientConsumer clientConsumer, @Nullable ClientConsumer clientConsumer2, @NotNull final AffinityExecutor affinityExecutor) {
        Intrinsics.checkParameterIsNotNull(clientConsumer, "rpcConsumer");
        Intrinsics.checkParameterIsNotNull(affinityExecutor, "onExecutor");
        if (clientConsumer2 != null) {
            clientConsumer2.setMessageHandler(new MessageHandler() { // from class: net.corda.node.services.messaging.RPCDispatcher$start$1
                public final void onMessage(ClientMessage clientMessage) {
                    Set removeAll;
                    String stringProperty = clientMessage.getStringProperty("_AMQ_RoutingName");
                    HashMultimap hashMultimap = RPCDispatcher.this.queueToSubscription;
                    Intrinsics.checkExpressionValueIsNotNull(hashMultimap, "queueToSubscription");
                    synchronized (hashMultimap) {
                        removeAll = RPCDispatcher.this.queueToSubscription.removeAll(stringProperty);
                    }
                    Set set = removeAll;
                    if (!set.isEmpty()) {
                        RPCStructures.getRpcLog().debug("Observable queue was deleted, unsubscribing: " + stringProperty);
                        Iterator it = set.iterator();
                        while (it.hasNext()) {
                            ((Subscription) it.next()).unsubscribe();
                        }
                    }
                }
            });
        }
        clientConsumer.setMessageHandler(new MessageHandler() { // from class: net.corda.node.services.messaging.RPCDispatcher$start$2
            public final void onMessage(final ClientMessage clientMessage) {
                clientMessage.acknowledge();
                affinityExecutor.execute(new Runnable() { // from class: net.corda.node.services.messaging.RPCDispatcher$start$2.1
                    @Override // java.lang.Runnable
                    public final void run() {
                        ClientRPCRequestMessage rPCRequestMessage;
                        try {
                            try {
                                try {
                                    rPCRequestMessage = RPCDispatcher.this.toRPCRequestMessage(clientMessage);
                                    RPCStructures.CURRENT_RPC_USER.set(rPCRequestMessage.getUser());
                                    RPCDispatcher.this.dispatch(rPCRequestMessage);
                                    RPCStructures.CURRENT_RPC_USER.remove();
                                } catch (RPCException e) {
                                    RPCStructures.getRpcLog().warn("Received malformed client RPC message: " + e.getMessage());
                                    RPCStructures.getRpcLog().trace("RPC exception", e);
                                    RPCStructures.CURRENT_RPC_USER.remove();
                                }
                            } catch (Throwable th) {
                                RPCStructures.getRpcLog().error("Uncaught exception when dispatching client RPC", th);
                                RPCStructures.CURRENT_RPC_USER.remove();
                            }
                        } catch (Throwable th2) {
                            RPCStructures.CURRENT_RPC_USER.remove();
                            throw th2;
                        }
                    }
                });
            }
        });
    }

    private final String requiredString(@NotNull ClientMessage clientMessage, String str) {
        String stringProperty = clientMessage.getStringProperty(str);
        if (stringProperty != null) {
            return stringProperty;
        }
        throw new RPCException("missing " + str + " property");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final ClientRPCRequestMessage toRPCRequestMessage(@NotNull ClientMessage clientMessage) {
        User user = getUser(clientMessage);
        String authenticatedAddress = getAuthenticatedAddress(clientMessage, user, ClientRPCRequestMessage.REPLY_TO, true);
        if (authenticatedAddress == null) {
            Intrinsics.throwNpe();
        }
        String authenticatedAddress2 = getAuthenticatedAddress(clientMessage, user, ClientRPCRequestMessage.OBSERVATIONS_TO, false);
        byte[] bArr = new byte[clientMessage.getBodySize()];
        clientMessage.getBodyBuffer().readBytes(bArr);
        byte[] bArr2 = bArr;
        if (bArr2.length == 0) {
            throw new RPCException("empty serialized args");
        }
        return new ClientRPCRequestMessage(new SerializedBytes(bArr2), authenticatedAddress, authenticatedAddress2, requiredString(clientMessage, ClientRPCRequestMessage.METHOD_NAME), user);
    }

    @VisibleForTesting
    @NotNull
    protected User getUser(@NotNull ClientMessage clientMessage) {
        Intrinsics.checkParameterIsNotNull(clientMessage, "message");
        RPCUserService rPCUserService = this.userService;
        String simpleString = Message.HDR_VALIDATED_USER.toString();
        Intrinsics.checkExpressionValueIsNotNull(simpleString, "Message.HDR_VALIDATED_USER.toString()");
        User user = rPCUserService.getUser(requiredString(clientMessage, simpleString));
        if (user == null) {
            Intrinsics.throwNpe();
        }
        return user;
    }

    private final String getAuthenticatedAddress(@NotNull ClientMessage clientMessage, User user, String str, boolean z) {
        String requiredString = z ? requiredString(clientMessage, str) : clientMessage.getStringProperty(str);
        String str2 = ArtemisMessagingComponent.CLIENTS_PREFIX + user.getUsername() + ".";
        if (requiredString == null || StringsKt.startsWith$default(requiredString, str2, false, 2, (Object) null)) {
            return requiredString;
        }
        throw new RPCException(str + " address does not match up with the user");
    }

    @NotNull
    public final RPCOps getOps() {
        return this.ops;
    }

    @NotNull
    public final RPCUserService getUserService() {
        return this.userService;
    }

    public RPCDispatcher(@NotNull RPCOps rPCOps, @NotNull RPCUserService rPCUserService) {
        Object obj;
        Intrinsics.checkParameterIsNotNull(rPCOps, "ops");
        Intrinsics.checkParameterIsNotNull(rPCUserService, "userService");
        this.ops = rPCOps;
        this.userService = rPCUserService;
        Method[] declaredMethods = this.ops.getClass().getDeclaredMethods();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Method method : declaredMethods) {
            String name = method.getName();
            Object obj2 = linkedHashMap.get(name);
            if (obj2 == null) {
                ArrayList arrayList = new ArrayList();
                linkedHashMap.put(name, arrayList);
                obj = arrayList;
            } else {
                obj = obj2;
            }
            ((List) obj).add(method);
        }
        LinkedHashMap linkedHashMap2 = new LinkedHashMap(MapsKt.mapCapacity(linkedHashMap.size()));
        for (Object obj3 : linkedHashMap.entrySet()) {
            linkedHashMap2.put(((Map.Entry) obj3).getKey(), (Method) CollectionsKt.single((List) ((Map.Entry) obj3).getValue()));
        }
        this.methodTable = linkedHashMap2;
        this.queueToSubscription = HashMultimap.create();
    }
}
