package net.corda.nodeapi.internal.protonwrapper.engine;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import kotlin.Metadata;
import kotlin.TypeCastException;
import kotlin.Unit;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus;
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl;
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl;
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient;
import net.corda.nodeapi.internal.serialization.amqp.CorDappCustomSerializerKt;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: EventProcessor.kt */
@Metadata(mv = {CorDappCustomSerializerKt.PROXY_TYPE, CorDappCustomSerializerKt.PROXY_TYPE, 8}, bv = {CorDappCustomSerializerKt.PROXY_TYPE, CorDappCustomSerializerKt.CORDAPP_TYPE, AMQPClient.NUM_CLIENT_THREADS}, k = CorDappCustomSerializerKt.PROXY_TYPE, d1 = {"��~\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n��\n\u0002\u0010\u000e\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\b��\u0018�� 32\u00020\u0001:\u00013B9\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\u0007\u0012\b\u0010\t\u001a\u0004\u0018\u00010\u0007\u0012\b\u0010\n\u001a\u0004\u0018\u00010\u0007¢\u0006\u0002\u0010\u000bJ\u000e\u0010\u001f\u001a\u00020\u00052\u0006\u0010 \u001a\u00020\u0016J\u0006\u0010!\u001a\u00020\"J\u000e\u0010#\u001a\u00020\"2\u0006\u0010$\u001a\u00020%J\n\u0010&\u001a\u0004\u0018\u00010'H\u0002J\u0006\u0010(\u001a\u00020\"J\u0006\u0010)\u001a\u00020\"J\u0010\u0010*\u001a\u00020\"2\u0006\u0010\u000e\u001a\u00020\u000fH\u0002J\u000e\u0010+\u001a\u00020\"2\u0006\u0010,\u001a\u00020-J\u000e\u0010.\u001a\u00020\"2\u0006\u0010/\u001a\u000200J\u000e\u00101\u001a\u00020\"2\u0006\u0010,\u001a\u000202R\u000e\u0010\f\u001a\u00020\rX\u0082\u0004¢\u0006\u0002\n��R\u0011\u0010\u000e\u001a\u00020\u000f¢\u0006\b\n��\u001a\u0004\b\u0010\u0010\u0011R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00160\u0015X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0017\u001a\u00020\u0018X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u0019\u001a\n \u001b*\u0004\u0018\u00010\u001a0\u001aX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u001c\u001a\u00020\u0005X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001d\u001a\u00020\u001eX\u0082\u0004¢\u0006\u0002\n��¨\u00064"}, d2 = {"Lnet/corda/nodeapi/internal/protonwrapper/engine/EventProcessor;", "Lorg/apache/qpid/proton/engine/BaseHandler;", "channel", "Lio/netty/channel/Channel;", "serverMode", "", "localLegalName", "", "remoteLegalName", "userName", "password", "(Lio/netty/channel/Channel;ZLjava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", "collector", "Lorg/apache/qpid/proton/engine/impl/CollectorImpl;", "connection", "Lorg/apache/qpid/proton/engine/Connection;", "getConnection", "()Lorg/apache/qpid/proton/engine/Connection;", "executor", "Ljava/util/concurrent/ScheduledExecutorService;", "handlers", "", "Lorg/apache/qpid/proton/engine/Handler;", "lock", "Ljava/util/concurrent/locks/ReentrantLock;", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "pendingExecute", "stateMachine", "Lnet/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine;", "addHandler", "handler", "close", "", "complete", "completer", "Lnet/corda/nodeapi/internal/protonwrapper/messages/impl/ReceivedMessageImpl$MessageCompleter;", "popEvent", "Lorg/apache/qpid/proton/engine/Event;", "processEvents", "processEventsAsync", "tick", "transportProcessInput", "msg", "Lio/netty/buffer/ByteBuf;", "transportProcessOutput", "ctx", "Lio/netty/channel/ChannelHandlerContext;", "transportWriteMessage", "Lnet/corda/nodeapi/internal/protonwrapper/messages/impl/SendableMessageImpl;", "Companion", "node-api"})
/* loaded from: input_file:net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.class */
public final class EventProcessor extends BaseHandler {
    private final Logger log;
    private final ReentrantLock lock;
    private boolean pendingExecute;
    private final ScheduledExecutorService executor;
    private final CollectorImpl collector;
    private final List<Handler> handlers;
    private final ConnectionStateMachine stateMachine;

    @NotNull
    private final Connection connection;
    private static final int FLOW_WINDOW_SIZE = 10;
    public static final Companion Companion = new Companion(null);

    /* compiled from: EventProcessor.kt */
    @Metadata(mv = {CorDappCustomSerializerKt.PROXY_TYPE, CorDappCustomSerializerKt.PROXY_TYPE, 8}, bv = {CorDappCustomSerializerKt.PROXY_TYPE, CorDappCustomSerializerKt.CORDAPP_TYPE, AMQPClient.NUM_CLIENT_THREADS}, k = CorDappCustomSerializerKt.PROXY_TYPE, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\b\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lnet/corda/nodeapi/internal/protonwrapper/engine/EventProcessor$Companion;", "", "()V", "FLOW_WINDOW_SIZE", "", "node-api"})
    /* loaded from: input_file:net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    @NotNull
    public final Connection getConnection() {
        return this.connection;
    }

    public final boolean addHandler(@NotNull Handler handler) {
        Intrinsics.checkParameterIsNotNull(handler, "handler");
        return this.handlers.add(handler);
    }

    private final Event popEvent() {
        Event peek = this.collector.peek();
        if (peek != null) {
            peek = peek.copy();
            this.collector.pop();
        }
        return peek;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void tick(final Connection connection) {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            try {
                if ((!Intrinsics.areEqual(connection.getLocalState(), EndpointState.CLOSED)) && !connection.getTransport().isClosed()) {
                    long currentTimeMillis = System.currentTimeMillis();
                    this.executor.schedule(new Runnable() { // from class: net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor$tick$$inlined$withLock$lambda$1
                        @Override // java.lang.Runnable
                        public final void run() {
                            EventProcessor.this.tick(connection);
                            EventProcessor.this.processEvents();
                        }
                    }, Math.max(0L, connection.getTransport().tick(currentTimeMillis) - currentTimeMillis), TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                connection.getTransport().close();
                connection.setCondition(new ErrorCondition());
            }
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public final void processEvents() {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            this.pendingExecute = false;
            Logger logger = this.log;
            if (logger.isDebugEnabled()) {
                logger.debug("Process Events");
            }
            while (true) {
                Event popEvent = popEvent();
                if (popEvent == null) {
                    break;
                }
                Logger logger2 = this.log;
                if (logger2.isDebugEnabled()) {
                    logger2.debug("Process event: " + popEvent);
                }
                Iterator<Handler> it = this.handlers.iterator();
                while (it.hasNext()) {
                    it.next().handle(popEvent);
                }
            }
            this.stateMachine.processTransport();
            Logger logger3 = this.log;
            if (logger3.isDebugEnabled()) {
                logger3.debug("Process Events Done");
            }
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public final void processEventsAsync() {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (!this.pendingExecute) {
                this.pendingExecute = true;
                this.executor.execute(new Runnable() { // from class: net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor$processEventsAsync$$inlined$withLock$lambda$1
                    @Override // java.lang.Runnable
                    public final void run() {
                        EventProcessor.this.processEvents();
                    }
                });
            }
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public final void close() {
        if (!Intrinsics.areEqual(this.connection.getLocalState(), EndpointState.CLOSED)) {
            this.connection.close();
            processEvents();
            this.connection.free();
            processEvents();
        }
    }

    public final void transportProcessInput(@NotNull ByteBuf byteBuf) {
        Intrinsics.checkParameterIsNotNull(byteBuf, "msg");
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            this.stateMachine.transportProcessInput(byteBuf);
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public final void transportProcessOutput(@NotNull ChannelHandlerContext channelHandlerContext) {
        Intrinsics.checkParameterIsNotNull(channelHandlerContext, "ctx");
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            this.stateMachine.transportProcessOutput(channelHandlerContext);
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public final void transportWriteMessage(@NotNull SendableMessageImpl sendableMessageImpl) {
        Intrinsics.checkParameterIsNotNull(sendableMessageImpl, "msg");
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            this.stateMachine.transportWriteMessage(sendableMessageImpl);
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public final void complete(@NotNull ReceivedMessageImpl.MessageCompleter messageCompleter) {
        DeliveryState rejected;
        Intrinsics.checkParameterIsNotNull(messageCompleter, "completer");
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (Intrinsics.areEqual(messageCompleter.getStatus(), MessageStatus.Acknowledged)) {
                Accepted accepted = Accepted.getInstance();
                Intrinsics.checkExpressionValueIsNotNull(accepted, "Accepted.getInstance()");
                rejected = (DeliveryState) accepted;
            } else {
                rejected = new Rejected();
            }
            messageCompleter.getDelivery().disposition(rejected);
            messageCompleter.getDelivery().settle();
            Unit unit = Unit.INSTANCE;
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public EventProcessor(@NotNull Channel channel, boolean z, @NotNull String str, @NotNull String str2, @Nullable String str3, @Nullable String str4) {
        Intrinsics.checkParameterIsNotNull(channel, "channel");
        Intrinsics.checkParameterIsNotNull(str, "localLegalName");
        Intrinsics.checkParameterIsNotNull(str2, "remoteLegalName");
        this.log = LoggerFactory.getLogger(str);
        this.lock = new ReentrantLock();
        ScheduledExecutorService eventLoop = channel.eventLoop();
        Intrinsics.checkExpressionValueIsNotNull(eventLoop, "channel.eventLoop()");
        this.executor = eventLoop;
        CollectorImpl collector = Proton.collector();
        if (collector == null) {
            throw new TypeCastException("null cannot be cast to non-null type org.apache.qpid.proton.engine.impl.CollectorImpl");
        }
        this.collector = collector;
        this.handlers = new ArrayList();
        this.stateMachine = new ConnectionStateMachine(z, this.collector, str, str2, str3, str4);
        this.connection = this.stateMachine.getConnection();
        addHandler((Handler) new Handshaker());
        addHandler((Handler) new FlowController(FLOW_WINDOW_SIZE));
        addHandler((Handler) this.stateMachine);
        this.connection.setContext(channel);
        tick(this.stateMachine.getConnection());
    }
}
