package com.ibasco.agql.core;

import com.ibasco.agql.core.exceptions.ChannelClosedException;
import com.ibasco.agql.core.exceptions.TransportException;
import com.ibasco.agql.core.exceptions.WriteInProgressException;
import com.ibasco.agql.core.util.Errors;
import com.ibasco.agql.core.util.GeneralOptions;
import com.ibasco.agql.core.util.Netty;
import com.ibasco.agql.core.util.Options;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.ResourceLeakDetector;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ibasco/agql/core/NettyTransport.class */
public class NettyTransport implements Transport<NettyChannelContext, NettyChannelContext> {
    private static final Logger log;
    private static final ChannelFutureListener COMPLETE_ON_WRITE;
    private final Options options;
    static final /* synthetic */ boolean $assertionsDisabled;

    public NettyTransport(Options options) {
        this.options = (Options) Objects.requireNonNull(options, "[INIT] TRANSPORT => Missing options");
        if (log.isErrorEnabled()) {
            ResourceLeakDetector.Level level = (ResourceLeakDetector.Level) options.getOrDefault(GeneralOptions.RESOURCE_LEAK_DETECTOR_LEVEL);
            ResourceLeakDetector.setLevel(level);
            log.debug("[INIT] TRANSPORT => Set ResourceLeakDetector level to '{}'", level.name());
        }
    }

    @Override // com.ibasco.agql.core.Transport
    public final CompletableFuture<NettyChannelContext> send(NettyChannelContext nettyChannelContext) {
        checkContext(nettyChannelContext);
        CompletableFuture completedFuture = CompletableFuture.completedFuture(nettyChannelContext);
        return nettyChannelContext.inEventLoop() ? completedFuture.thenCompose(this::writeAndNotify).handle(this::finalize) : completedFuture.thenComposeAsync(this::writeAndNotify, (Executor) nettyChannelContext.eventLoop()).handleAsync(this::finalize, (Executor) nettyChannelContext.eventLoop());
    }

    private static void checkContext(NettyChannelContext nettyChannelContext) {
        if (nettyChannelContext == null) {
            throw new IllegalStateException("Channel context must not be null");
        }
        if (nettyChannelContext.properties().envelope() == null) {
            throw new IllegalStateException("No valid request attached to channel context: " + nettyChannelContext);
        }
    }

    private CompletableFuture<NettyChannelContext> writeAndNotify(NettyChannelContext nettyChannelContext) {
        Envelope envelope;
        Channel channel = nettyChannelContext.channel();
        Objects.requireNonNull(channel, "Channel is null");
        if (!$assertionsDisabled && !nettyChannelContext.inEventLoop()) {
            throw new AssertionError();
        }
        if (!nettyChannelContext.isValid()) {
            throw new IllegalStateException("Context is no longer in a valid state", new ChannelClosedException(channel));
        }
        if (nettyChannelContext.properties().writeInProgress()) {
            throw new WriteInProgressException(String.format("A write is currently in-progress for context '%s'", nettyChannelContext));
        }
        CompletableFuture<NettyChannelContext> beginWrite = nettyChannelContext.properties().beginWrite();
        if (!$assertionsDisabled && beginWrite == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && beginWrite.isDone()) {
            throw new AssertionError();
        }
        try {
            envelope = nettyChannelContext.properties().envelope();
        } catch (Exception e) {
            log.debug("{} TRANSPORT => Error occured during writeAndNotify operation", nettyChannelContext.id(), e);
            nettyChannelContext.properties().endWrite(e);
        }
        if (envelope == null) {
            throw new IllegalStateException("Request envelope is not present in channel");
        }
        log.debug("{} TRANSPORT => Sending request '{}' to transport", nettyChannelContext.id(), envelope.content());
        ChannelFuture writeAndFlush = channel.writeAndFlush(envelope);
        if (writeAndFlush.isDone()) {
            COMPLETE_ON_WRITE.operationComplete(writeAndFlush);
        } else {
            writeAndFlush.addListener(COMPLETE_ON_WRITE);
        }
        return beginWrite;
    }

    private NettyChannelContext finalize(NettyChannelContext nettyChannelContext, Throwable th) {
        if (th != null) {
            log.debug("TRANSPORT => Error during write operation", th);
            throw new TransportException("Failed to send request via transport", Errors.unwrap(th));
        }
        if (nettyChannelContext.channel() == null) {
            throw new IllegalStateException("Channel is null");
        }
        if ($assertionsDisabled || nettyChannelContext.inEventLoop()) {
            return nettyChannelContext;
        }
        throw new AssertionError();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    @Override // com.ibasco.agql.core.util.ConfigurationSupport
    public final Options getOptions() {
        return this.options;
    }

    static {
        $assertionsDisabled = !NettyTransport.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(NettyTransport.class);
        COMPLETE_ON_WRITE = channelFuture -> {
            Channel channel = channelFuture.channel();
            NettyChannelContext context = NettyChannelContext.getContext(channel);
            try {
                if (!$assertionsDisabled && !context.channel().id().equals(channel.id())) {
                    throw new AssertionError();
                }
                if (channelFuture.isSuccess()) {
                    log.debug("{} TRANSPORT => Request has been sent and processed through the channel's pipeline (Request: {})", context.id(), context.properties().request());
                    context.properties().endWrite();
                } else {
                    log.debug("{} TRANSPORT => An error occured while sending request through the channel's pipeline", context.id(), channelFuture.cause());
                    context.properties().endWrite(channelFuture.cause());
                }
                if (!$assertionsDisabled && context.properties().writeInProgress()) {
                    throw new AssertionError();
                }
            } catch (Exception e) {
                log.debug("{} TRANSPORT => Error occured during write operation", Netty.id(channel), e);
                if (context.properties().writeInProgress()) {
                    context.properties().endWrite(e);
                }
            }
        };
    }
}
