package ch.squaredesk.nova.comm.jms;

import ch.squaredesk.nova.comm.retrieving.IncomingMessage;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Flowable;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.jms.Destination;

/* loaded from: input_file:ch/squaredesk/nova/comm/jms/RpcServer.class */
public class RpcServer<InternalMessageType> extends ch.squaredesk.nova.comm.rpc.RpcServer<Destination, RpcInvocation<InternalMessageType>> {
    private final ch.squaredesk.nova.comm.sending.MessageSender<InternalMessageType, OutgoingMessageMetaData> messageSender;
    private final ch.squaredesk.nova.comm.retrieving.MessageReceiver<Destination, InternalMessageType, IncomingMessageMetaData> messageReceiver;
    private final Function<Throwable, InternalMessageType> errorReplyFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RpcServer(String str, ch.squaredesk.nova.comm.retrieving.MessageReceiver<Destination, InternalMessageType, IncomingMessageMetaData> messageReceiver, ch.squaredesk.nova.comm.sending.MessageSender<InternalMessageType, OutgoingMessageMetaData> messageSender, Function<Throwable, InternalMessageType> function, Metrics metrics) {
        super(str, metrics);
        Objects.requireNonNull(messageSender, "messageSender must not be null");
        Objects.requireNonNull(messageReceiver, "messageReceiver must not be null");
        Objects.requireNonNull(function, "errorReplyFactory must not be null");
        this.messageSender = messageSender;
        this.messageReceiver = messageReceiver;
        this.errorReplyFactory = function;
    }

    public Flowable<RpcInvocation<InternalMessageType>> requests(Destination destination) {
        return this.messageReceiver.messages(destination).filter(this::isRpcRequest).map(incomingMessage -> {
            this.metricsCollector.requestReceived(incomingMessage.message);
            Consumer<ReplyType> createReplyHandlerFor = createReplyHandlerFor(incomingMessage);
            Consumer<Throwable> createErrorReplyHandlerFor = createErrorReplyHandlerFor(incomingMessage);
            return new RpcInvocation(incomingMessage, pair -> {
                createReplyHandlerFor.accept(pair._1);
                this.metricsCollector.requestCompleted(incomingMessage.message, pair);
            }, th -> {
                this.metricsCollector.requestCompletedExceptionally(incomingMessage.message, th);
                createErrorReplyHandlerFor.accept(th);
            });
        });
    }

    private boolean isRpcRequest(IncomingMessage<InternalMessageType, IncomingMessageMetaData> incomingMessage) {
        return (incomingMessage.metaData == null || ((IncomingMessageMetaData) incomingMessage.metaData).details == null || ((RetrieveInfo) ((IncomingMessageMetaData) incomingMessage.metaData).details).replyDestination == null || ((RetrieveInfo) ((IncomingMessageMetaData) incomingMessage.metaData).details).correlationId == null) ? false : true;
    }

    private <RequestType extends InternalMessageType, ReplyType extends InternalMessageType> Consumer<ReplyType> createReplyHandlerFor(IncomingMessage<RequestType, IncomingMessageMetaData> incomingMessage) {
        OutgoingMessageMetaData outgoingMessageMetaData = new OutgoingMessageMetaData(((RetrieveInfo) ((IncomingMessageMetaData) incomingMessage.metaData).details).replyDestination, new SendInfo(((RetrieveInfo) ((IncomingMessageMetaData) incomingMessage.metaData).details).correlationId, null, null, null, null, null));
        return obj -> {
            this.messageSender.doSend(obj, outgoingMessageMetaData).subscribe();
        };
    }

    private Consumer<Throwable> createErrorReplyHandlerFor(IncomingMessage<InternalMessageType, IncomingMessageMetaData> incomingMessage) {
        OutgoingMessageMetaData outgoingMessageMetaData = new OutgoingMessageMetaData(((RetrieveInfo) ((IncomingMessageMetaData) incomingMessage.metaData).details).replyDestination, new SendInfo(((RetrieveInfo) ((IncomingMessageMetaData) incomingMessage.metaData).details).correlationId, null, null, null, null, null));
        return th -> {
            this.messageSender.doSend(this.errorReplyFactory.apply(th), outgoingMessageMetaData).subscribe();
        };
    }
}
