package org.finos.tracdap.gateway.routing;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.ReferenceCountUtil;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.finos.tracdap.common.exception.EUnexpected;
import org.finos.tracdap.config.GwProtocol;
import org.finos.tracdap.gateway.exec.Route;
import org.finos.tracdap.gateway.proxy.grpc.GrpcProtocol;
import org.finos.tracdap.gateway.proxy.grpc.GrpcProxyBuilder;
import org.finos.tracdap.gateway.proxy.http.HttpProtocol;
import org.finos.tracdap.gateway.routing.CoreRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/finos/tracdap/gateway/routing/WebSocketsRouter.class */
public class WebSocketsRouter extends CoreRouter {
    private static final String WEBSOCKETS_PROTOCOL = "websockets";
    private static final Duration CLOSE_ON_ERROR_TIMEOUT = Duration.ofSeconds(5);
    private static final String TRAC_HEADER_PREFIX = "trac_";
    private final Logger log;
    private String upgradeUri;
    private HttpHeaders upgradeHeaders;
    private int routeId;
    private boolean upgradeComplete;
    private boolean firstMessageReceived;
    private boolean closeFrameSent;

    public WebSocketsRouter(List<Route> list, int i) {
        super(list, i, WEBSOCKETS_PROTOCOL);
        this.log = LoggerFactory.getLogger(getClass());
        this.routeId = -1;
        this.upgradeComplete = false;
        this.firstMessageReceived = false;
        this.closeFrameSent = false;
    }

    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof WebSocketServerProtocolHandler.HandshakeComplete)) {
            super.userEventTriggered(channelHandlerContext, obj);
            return;
        }
        WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) obj;
        this.upgradeUri = handshakeComplete.requestUri();
        this.upgradeHeaders = handshakeComplete.requestHeaders();
        this.upgradeComplete = true;
        this.log.info("conn = {}, websockets handshake complete, sub-protocol = [{}]", Integer.valueOf(this.connId), handshakeComplete.selectedSubprotocol());
        if (this.log.isTraceEnabled()) {
            this.log.trace("conn = {}, handshake headers: {}", Integer.valueOf(this.connId), handshakeComplete.requestHeaders().toString());
        }
    }

    public void channelRead(@Nonnull ChannelHandlerContext channelHandlerContext, @Nonnull Object obj) throws Exception {
        try {
            if (!this.upgradeComplete) {
                ReferenceCountUtil.retain(obj);
                super.channelRead(channelHandlerContext, obj);
                ReferenceCountUtil.release(obj);
                return;
            }
            if (!(obj instanceof WebSocketFrame)) {
                this.log.error("conn = {}, Unexpected message of type [{}]", Integer.valueOf(this.connId), obj.getClass().getSimpleName());
                throw new EUnexpected();
            }
            WebSocketFrame webSocketFrame = (WebSocketFrame) obj;
            if (this.log.isTraceEnabled()) {
                this.log.trace("conn = {}, inbound websockets frame, size = [{}], type = [{}]", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(webSocketFrame.content().readableBytes()), webSocketFrame.getClass().getSimpleName()});
            }
            if (webSocketFrame instanceof BinaryWebSocketFrame) {
                processInboundBinaryFrame(channelHandlerContext, (BinaryWebSocketFrame) webSocketFrame);
            } else if (webSocketFrame instanceof TextWebSocketFrame) {
                processInboundTextFrame(channelHandlerContext);
            } else if (webSocketFrame instanceof ContinuationWebSocketFrame) {
                processInboundContinuationFrame(channelHandlerContext, (ContinuationWebSocketFrame) webSocketFrame);
            } else if (webSocketFrame instanceof PingWebSocketFrame) {
                processInboundPingFrame();
            } else if (webSocketFrame instanceof PongWebSocketFrame) {
                processInboundPongFrame();
            } else {
                if (!(webSocketFrame instanceof CloseWebSocketFrame)) {
                    throw new EUnexpected();
                }
                processInboundCloseFrame(channelHandlerContext, (CloseWebSocketFrame) webSocketFrame);
            }
        } finally {
            ReferenceCountUtil.release(obj);
        }
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        try {
            if (!this.upgradeComplete) {
                ReferenceCountUtil.retain(obj);
                super.write(channelHandlerContext, obj, channelPromise);
                destroyAssociation(obj);
                ReferenceCountUtil.release(obj);
                return;
            }
            if (!(obj instanceof WebSocketFrame)) {
                this.log.error("conn = {}, Unexpected message of type [{}]", Integer.valueOf(this.connId), obj.getClass().getSimpleName());
                throw new EUnexpected();
            }
            WebSocketFrame webSocketFrame = (WebSocketFrame) obj;
            if (this.log.isTraceEnabled()) {
                this.log.trace("conn = {}, outbound websockets frame, size = [{}], type = [{}]", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(webSocketFrame.content().readableBytes()), webSocketFrame.getClass().getSimpleName()});
            }
            if (webSocketFrame instanceof CloseWebSocketFrame) {
                processOutboundCloseFrame(channelHandlerContext, (CloseWebSocketFrame) webSocketFrame, channelPromise);
            } else if (!this.closeFrameSent) {
                ReferenceCountUtil.retain(obj);
                channelHandlerContext.write(webSocketFrame, channelPromise);
            }
        } finally {
            destroyAssociation(obj);
            ReferenceCountUtil.release(obj);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        reportErrorAndClose(channelHandlerContext, th.getMessage(), WebSocketCloseStatus.INTERNAL_SERVER_ERROR, th);
    }

    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    protected ChannelInitializer<Channel> initializeProxyRoute(ChannelHandlerContext channelHandlerContext, CoreRouterLink coreRouterLink, Route route) {
        if (route.getConfig().getRouteType() != GwProtocol.GRPC) {
            throw new EUnexpected();
        }
        return new GrpcProxyBuilder(route.getConfig(), coreRouterLink, this.connId, HttpProtocol.WEBSOCKETS, GrpcProtocol.GRPC_WEBSOCKETS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    public void reportProxyRouteError(ChannelHandlerContext channelHandlerContext, Throwable th, boolean z) {
        reportErrorAndClose(channelHandlerContext, th.getMessage(), WebSocketCloseStatus.INTERNAL_SERVER_ERROR, th);
    }

    private void processInboundBinaryFrame(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame) {
        if (this.firstMessageReceived) {
            relayFrame(channelHandlerContext, binaryWebSocketFrame, false);
            return;
        }
        Route lookupRoute = lookupRoute(URI.create(this.upgradeUri), HttpMethod.POST, 1L);
        if (lookupRoute == null) {
            reportErrorAndClose(channelHandlerContext, String.format("No route found for [%s]", this.upgradeUri), WebSocketCloseStatus.ENDPOINT_UNAVAILABLE, null);
            return;
        }
        getOrCreateTarget(channelHandlerContext, lookupRoute);
        this.routeId = lookupRoute.getIndex();
        this.firstMessageReceived = true;
        relayFrame(channelHandlerContext, addUpgradeHeaders(binaryWebSocketFrame), true);
    }

    private void processInboundTextFrame(ChannelHandlerContext channelHandlerContext) {
        reportErrorAndClose(channelHandlerContext, "Web socket text frames are not currently supported", WebSocketCloseStatus.INVALID_MESSAGE_TYPE, null);
    }

    private void processInboundContinuationFrame(ChannelHandlerContext channelHandlerContext, ContinuationWebSocketFrame continuationWebSocketFrame) {
        if (this.firstMessageReceived) {
            relayFrame(channelHandlerContext, continuationWebSocketFrame, false);
        } else {
            reportErrorAndClose(channelHandlerContext, String.format("Invalid stream for [%s] (continuation before start)", this.upgradeUri), WebSocketCloseStatus.INVALID_MESSAGE_TYPE, null);
        }
    }

    private void processInboundPingFrame() {
        if (this.log.isTraceEnabled()) {
            this.log.trace("conn = {}, inbound ping frame", Integer.valueOf(this.connId));
        }
    }

    private void processInboundPongFrame() {
        if (this.log.isTraceEnabled()) {
            this.log.trace("conn = {}, inbound pong frame", Integer.valueOf(this.connId));
        }
    }

    private void processInboundCloseFrame(ChannelHandlerContext channelHandlerContext, CloseWebSocketFrame closeWebSocketFrame) {
        if (this.closeFrameSent) {
            this.log.info("conn = {}, received close response, code = [{}]: {}", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(closeWebSocketFrame.statusCode()), closeWebSocketFrame.reasonText()});
            channelHandlerContext.close();
            return;
        }
        this.log.info("conn = {}, received close request, code = [{}]: {}", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(closeWebSocketFrame.statusCode()), closeWebSocketFrame.reasonText()});
        CloseWebSocketFrame closeWebSocketFrame2 = new CloseWebSocketFrame(closeWebSocketFrame.statusCode(), closeWebSocketFrame.reasonText());
        ChannelPromise newPromise = channelHandlerContext.newPromise();
        this.log.info("conn = {}, sending close response, code = [{}]: {}", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(closeWebSocketFrame.statusCode()), closeWebSocketFrame.reasonText()});
        this.closeFrameSent = true;
        channelHandlerContext.write(closeWebSocketFrame2, newPromise);
        channelHandlerContext.flush();
        newPromise.addListener(future -> {
            channelHandlerContext.close();
        });
    }

    private void processOutboundCloseFrame(ChannelHandlerContext channelHandlerContext, CloseWebSocketFrame closeWebSocketFrame, ChannelPromise channelPromise) {
        if (this.closeFrameSent) {
            this.log.warn("conn = {}, ignoring duplicate outbound close request [{}]: {}", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(closeWebSocketFrame.statusCode()), closeWebSocketFrame.reasonText()});
            return;
        }
        this.log.info("conn = {}, sending close request, code = [{}]: {}", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(closeWebSocketFrame.statusCode()), closeWebSocketFrame.reasonText()});
        ReferenceCountUtil.retain(closeWebSocketFrame);
        this.closeFrameSent = true;
        channelHandlerContext.write(closeWebSocketFrame, channelPromise);
        channelHandlerContext.flush();
    }

    private BinaryWebSocketFrame addUpgradeHeaders(BinaryWebSocketFrame binaryWebSocketFrame) {
        StringBuilder sb = new StringBuilder();
        sb.append(":method: POST\r\n");
        sb.append(":scheme: http\r\n");
        sb.append(":path: ").append(this.upgradeUri).append("\r\n");
        Iterator it = this.upgradeHeaders.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            if (((String) entry.getKey()).toLowerCase().startsWith(TRAC_HEADER_PREFIX)) {
                sb.append(((String) entry.getKey()).toLowerCase());
                sb.append(": ");
                sb.append((String) entry.getValue());
                sb.append("\r\n");
            }
        }
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(sb, StandardCharsets.US_ASCII);
        CompositeByteBuf compositeBuffer = Unpooled.compositeBuffer();
        compositeBuffer.addComponent(copiedBuffer);
        compositeBuffer.addComponent(binaryWebSocketFrame.content().retain());
        compositeBuffer.writerIndex(copiedBuffer.writerIndex() + binaryWebSocketFrame.content().writerIndex());
        return new BinaryWebSocketFrame(compositeBuffer);
    }

    private void relayFrame(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame, boolean z) {
        try {
            CoreRouter.TargetChannelState target = getTarget(this.routeId);
            if (target == null) {
                reportErrorAndClose(channelHandlerContext, String.format("Proxy connection has been closed for [%s]", this.upgradeUri), WebSocketCloseStatus.ENDPOINT_UNAVAILABLE, null);
            } else {
                relayMessage(target, webSocketFrame.retain());
                flushMessages(target);
            }
        } catch (Throwable th) {
            if (z) {
                ReferenceCountUtil.release(webSocketFrame);
            }
            throw th;
        }
    }

    private void reportErrorAndClose(ChannelHandlerContext channelHandlerContext, String str, WebSocketCloseStatus webSocketCloseStatus, Throwable th) {
        if (th != null) {
            this.log.error("conn = {}, sending close request with code [{}, {}]: {}", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(webSocketCloseStatus.code()), webSocketCloseStatus.reasonText(), str, th});
        } else {
            this.log.error("conn = {}, sending close request with code [{}, {}]: {}", new Object[]{Integer.valueOf(this.connId), Integer.valueOf(webSocketCloseStatus.code()), webSocketCloseStatus.reasonText(), str});
        }
        ChannelPromise newPromise = channelHandlerContext.newPromise();
        channelHandlerContext.writeAndFlush(new CloseWebSocketFrame(webSocketCloseStatus, str), newPromise);
        channelHandlerContext.executor().schedule(() -> {
            if (newPromise.isDone()) {
                return;
            }
            channelHandlerContext.close();
        }, CLOSE_ON_ERROR_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
    }

    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    public /* bridge */ /* synthetic */ void disconnect(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) throws Exception {
        super.disconnect(channelHandlerContext, channelPromise);
    }

    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    public /* bridge */ /* synthetic */ void close(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) throws Exception {
        super.close(channelHandlerContext, channelPromise);
    }

    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    public /* bridge */ /* synthetic */ void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelRegistered(channelHandlerContext);
    }

    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    public /* bridge */ /* synthetic */ void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.handlerRemoved(channelHandlerContext);
    }

    @Override // org.finos.tracdap.gateway.routing.CoreRouter
    public /* bridge */ /* synthetic */ void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.handlerAdded(channelHandlerContext);
    }
}
