package io.streamnative.pulsar.handlers.kop;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.naming.AuthenticationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.ResponseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.class */
public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
    private static final Logger log = LoggerFactory.getLogger(KafkaCommandDecoder.class);
    protected ChannelHandlerContext ctx;
    protected SocketAddress remoteAddress;
    protected AtomicBoolean isActive = new AtomicBoolean(false);
    private Queue<ResponseAndRequest> requestsQueue;

    /* renamed from: io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder$1, reason: invalid class name */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$ApiKeys = new int[ApiKeys.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.API_VERSIONS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.METADATA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.PRODUCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FIND_COORDINATOR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.LIST_OFFSETS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.OFFSET_FETCH.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.OFFSET_COMMIT.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.FETCH.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.JOIN_GROUP.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.SYNC_GROUP.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.HEARTBEAT.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.LEAVE_GROUP.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DESCRIBE_GROUPS.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.LIST_GROUPS.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DELETE_GROUPS.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.SASL_HANDSHAKE.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.SASL_AUTHENTICATE.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.CREATE_TOPICS.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$ApiKeys[ApiKeys.DESCRIBE_CONFIGS.ordinal()] = 19;
            } catch (NoSuchFieldError e19) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$KafkaHeaderAndRequest.class */
    public static class KafkaHeaderAndRequest implements Closeable {
        private static final String DEFAULT_CLIENT_HOST = "";
        private final RequestHeader header;
        private final AbstractRequest request;
        private final ByteBuf buffer;
        private final SocketAddress remoteAddress;
        private final boolean headerless;

        KafkaHeaderAndRequest(RequestHeader requestHeader, AbstractRequest abstractRequest, ByteBuf byteBuf, SocketAddress socketAddress, boolean z) {
            this.header = requestHeader;
            this.request = abstractRequest;
            this.buffer = byteBuf.retain();
            this.remoteAddress = socketAddress;
            this.headerless = z;
        }

        public RequestHeader getHeader() {
            return this.header;
        }

        public boolean isHeaderless() {
            return this.headerless;
        }

        public AbstractRequest getRequest() {
            return this.request;
        }

        public SocketAddress getRemoteAddress() {
            return this.remoteAddress;
        }

        public String getClientHost() {
            return this.remoteAddress == null ? "" : this.remoteAddress.toString();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ByteBuf getBuffer() {
            return this.buffer;
        }

        public String toString() {
            return String.format("KafkaHeaderAndRequest(header=%s, request=%s, remoteAddress=%s)", this.header, this.request, this.remoteAddress);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.buffer.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$KafkaHeaderAndResponse.class */
    public static class KafkaHeaderAndResponse implements Closeable {
        private final short apiVersion;
        private final ResponseHeader header;
        private final AbstractResponse response;

        private KafkaHeaderAndResponse(short s, ResponseHeader responseHeader, AbstractResponse abstractResponse) {
            this.apiVersion = s;
            this.header = responseHeader;
            this.response = abstractResponse;
        }

        public short getApiVersion() {
            return this.apiVersion;
        }

        public ResponseHeader getHeader() {
            return this.header;
        }

        public AbstractResponse getResponse() {
            return this.response;
        }

        static KafkaHeaderAndResponse responseForRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, AbstractResponse abstractResponse) {
            return new KafkaHeaderAndResponse(kafkaHeaderAndRequest.getHeader().apiVersion(), kafkaHeaderAndRequest.getHeader().toResponseHeader(), abstractResponse);
        }

        public String toString() {
            return String.format("KafkaHeaderAndResponse(header=%s,responseFuture=%s)", this.header.toStruct().toString(), this.response.toString(getApiVersion()));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder$ResponseAndRequest.class */
    public static class ResponseAndRequest {
        private CompletableFuture<AbstractResponse> responseFuture;
        private KafkaHeaderAndRequest request;

        public static ResponseAndRequest of(CompletableFuture<AbstractResponse> completableFuture, KafkaHeaderAndRequest kafkaHeaderAndRequest) {
            return new ResponseAndRequest(completableFuture, kafkaHeaderAndRequest);
        }

        ResponseAndRequest(CompletableFuture<AbstractResponse> completableFuture, KafkaHeaderAndRequest kafkaHeaderAndRequest) {
            this.responseFuture = completableFuture;
            this.request = kafkaHeaderAndRequest;
        }

        public CompletableFuture<AbstractResponse> getResponseFuture() {
            return this.responseFuture;
        }

        public KafkaHeaderAndRequest getRequest() {
            return this.request;
        }
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        this.remoteAddress = channelHandlerContext.channel().remoteAddress();
        this.ctx = channelHandlerContext;
        this.isActive.set(true);
        this.requestsQueue = Queues.newConcurrentLinkedQueue();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        log.error("[{}] Got exception: {}", new Object[]{this.remoteAddress, th.getMessage(), th});
        close();
    }

    protected void close() {
        this.ctx.close();
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", Boolean.valueOf(channelHandlerContext.channel().isWritable()));
        }
        if (channelHandlerContext.channel().isWritable()) {
            channelHandlerContext.channel().config().setAutoRead(true);
        } else {
            log.debug("channel is not writable, disable auto reading for back pressing");
            channelHandlerContext.channel().config().setAutoRead(false);
            channelHandlerContext.flush();
        }
        channelHandlerContext.fireChannelWritabilityChanged();
    }

    protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf byteBuf) {
        return byteBufToRequest(byteBuf, null);
    }

    protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf byteBuf, SocketAddress socketAddress) {
        RequestHeader parse;
        Preconditions.checkArgument(byteBuf.readableBytes() > 0);
        ByteBuffer nioBuffer = byteBuf.nioBuffer();
        boolean z = false;
        try {
            parse = RequestHeader.parse(nioBuffer);
        } catch (InvalidRequestException e) {
            nioBuffer.rewind();
            Optional<ByteBuffer> fixHeaderlessSASLAuthenticateV0 = RequestUtils.fixHeaderlessSASLAuthenticateV0(nioBuffer);
            if (fixHeaderlessSASLAuthenticateV0.isPresent()) {
                z = true;
                log.info("Caught InvalidRequestException while reading headerless request. Verified it is a SASL_AUTHENTICATE request. Converted to a correct request.");
                nioBuffer = fixHeaderlessSASLAuthenticateV0.get();
            }
            parse = RequestHeader.parse(nioBuffer);
        }
        if (isUnsupportedApiVersionsRequest(parse)) {
            return new KafkaHeaderAndRequest(parse, new ApiVersionsRequest((short) 0, Short.valueOf(parse.apiVersion())), byteBuf, socketAddress, z);
        }
        ApiKeys apiKey = parse.apiKey();
        short apiVersion = parse.apiVersion();
        return new KafkaHeaderAndRequest(parse, AbstractRequest.parseRequest(apiKey, apiVersion, apiKey.parseRequest(apiVersion, nioBuffer)), byteBuf, socketAddress, z);
    }

    protected ResponseStructMessage responseToStructMessage(AbstractResponse abstractResponse, KafkaHeaderAndRequest kafkaHeaderAndRequest) {
        try {
            KafkaHeaderAndResponse responseForRequest = KafkaHeaderAndResponse.responseForRequest(kafkaHeaderAndRequest, abstractResponse);
            try {
                short apiVersion = responseForRequest.getApiVersion();
                if (kafkaHeaderAndRequest.getHeader().apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(apiVersion)) {
                    apiVersion = ApiKeys.API_VERSIONS.oldestVersion();
                }
                ResponseStructMessage structResponse = ResponseUtils.structResponse(apiVersion, responseForRequest.getHeader(), responseForRequest.getResponse(), kafkaHeaderAndRequest.isHeaderless());
                if (responseForRequest != null) {
                    responseForRequest.close();
                }
                return structResponse;
            } finally {
            }
        } finally {
            kafkaHeaderAndRequest.close();
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        ByteBuf byteBuf = (ByteBuf) obj;
        Channel channel = channelHandlerContext.channel();
        SocketAddress socketAddress = null;
        if (null != channel) {
            socketAddress = channel.remoteAddress();
        }
        KafkaHeaderAndRequest byteBufToRequest = byteBufToRequest(byteBuf, socketAddress);
        try {
            try {
                if (log.isDebugEnabled()) {
                    Logger logger = log;
                    Object[] objArr = new Object[3];
                    objArr[0] = channelHandlerContext.channel() != null ? channelHandlerContext.channel().remoteAddress() : "Null channel";
                    objArr[1] = byteBufToRequest.getHeader();
                    objArr[2] = byteBufToRequest;
                    logger.debug("[{}] Received kafka cmd {}, the request content is: {}", objArr);
                }
                CompletableFuture<AbstractResponse> completableFuture = new CompletableFuture<>();
                completableFuture.whenComplete((abstractResponse, th) -> {
                    channelHandlerContext.channel().eventLoop().execute(() -> {
                        writeAndFlushResponseToClient(channel);
                    });
                });
                this.requestsQueue.add(ResponseAndRequest.of(completableFuture, byteBufToRequest));
                if (!this.isActive.get()) {
                    handleInactive(byteBufToRequest, completableFuture);
                } else {
                    if (!hasAuthenticated(byteBufToRequest)) {
                        authenticate(byteBufToRequest, completableFuture);
                        byteBuf.release();
                        return;
                    }
                    switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$ApiKeys[byteBufToRequest.getHeader().apiKey().ordinal()]) {
                        case 1:
                            handleApiVersionsRequest(byteBufToRequest, completableFuture);
                            break;
                        case 2:
                            handleTopicMetadataRequest(byteBufToRequest, completableFuture);
                            break;
                        case 3:
                            handleProduceRequest(byteBufToRequest, completableFuture);
                            break;
                        case 4:
                            handleFindCoordinatorRequest(byteBufToRequest, completableFuture);
                            break;
                        case 5:
                            handleListOffsetRequest(byteBufToRequest, completableFuture);
                            break;
                        case 6:
                            handleOffsetFetchRequest(byteBufToRequest, completableFuture);
                            break;
                        case 7:
                            handleOffsetCommitRequest(byteBufToRequest, completableFuture);
                            break;
                        case 8:
                            handleFetchRequest(byteBufToRequest, completableFuture);
                            break;
                        case 9:
                            handleJoinGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 10:
                            handleSyncGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 11:
                            handleHeartbeatRequest(byteBufToRequest, completableFuture);
                            break;
                        case MessageIdUtils.BATCH_BITS /* 12 */:
                            handleLeaveGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 13:
                            handleDescribeGroupRequest(byteBufToRequest, completableFuture);
                            break;
                        case 14:
                            handleListGroupsRequest(byteBufToRequest, completableFuture);
                            break;
                        case 15:
                            handleDeleteGroupsRequest(byteBufToRequest, completableFuture);
                            break;
                        case 16:
                            handleSaslHandshake(byteBufToRequest, completableFuture);
                            break;
                        case 17:
                            handleSaslAuthenticate(byteBufToRequest, completableFuture);
                            break;
                        case 18:
                            handleCreateTopics(byteBufToRequest, completableFuture);
                            break;
                        case 19:
                            handleDescribeConfigs(byteBufToRequest, completableFuture);
                            break;
                        default:
                            handleError(byteBufToRequest, completableFuture);
                            break;
                    }
                }
                byteBuf.release();
            } catch (AuthenticationException e) {
                log.error("unexpected error in authenticate:", e);
                close();
                byteBuf.release();
            } catch (Exception e2) {
                log.error("error while handle command:", e2);
                close();
                byteBuf.release();
            }
        } catch (Throwable th2) {
            byteBuf.release();
            throw th2;
        }
    }

    protected void writeAndFlushResponseToClient(Channel channel) {
        while (this.requestsQueue != null && this.requestsQueue.peek() != null && this.requestsQueue.peek().getResponseFuture().isDone() && this.isActive.get()) {
            ResponseAndRequest remove = this.requestsQueue.remove();
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Write kafka cmd response back to client. \n\trequest content: {} \n\tresponse content: {}", remove.getRequest().toString(), remove.getResponseFuture().join().toString(remove.getRequest().getRequest().version()));
                    log.debug("Write kafka cmd responseFuture back to client. request: {}", remove.getRequest().getHeader());
                }
                channel.writeAndFlush(responseToStructMessage(remove.getResponseFuture().get(), remove.getRequest()));
            } catch (Exception e) {
                log.error("error to get Response ByteBuf:", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeAndFlushWhenInactiveChannel(Channel channel) {
        while (this.requestsQueue != null && this.requestsQueue.peek() != null) {
            try {
                ResponseAndRequest remove = this.requestsQueue.remove();
                if (log.isDebugEnabled()) {
                    log.debug("Channel Closing! Write kafka cmd responseFuture back to client. request: {}", remove.getRequest().getHeader());
                }
                remove.getResponseFuture().complete(remove.getRequest().getRequest().getErrorResponse(new LeaderNotAvailableException("Channel is closing!")));
                channel.writeAndFlush(responseToStructMessage(remove.getResponseFuture().get(), remove.getRequest()));
            } catch (Exception e) {
                log.error("error to get Response ByteBuf:", e);
            }
        }
    }

    protected abstract boolean hasAuthenticated(KafkaHeaderAndRequest kafkaHeaderAndRequest);

    protected abstract void authenticate(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture) throws AuthenticationException;

    protected abstract void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleApiVersionsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleTopicMetadataRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleProduceRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleFindCoordinatorRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleListOffsetRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleOffsetFetchRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleOffsetCommitRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleFetchRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleJoinGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleSyncGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleHeartbeatRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleLeaveGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleSaslAuthenticate(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleSaslHandshake(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleCreateTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    protected abstract void handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> completableFuture);

    private static boolean isUnsupportedApiVersionsRequest(RequestHeader requestHeader) {
        return requestHeader.apiKey() == ApiKeys.API_VERSIONS && !ApiKeys.API_VERSIONS.isVersionSupported(requestHeader.apiVersion());
    }

    @VisibleForTesting
    public ChannelHandlerContext getCtx() {
        return this.ctx;
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    public AtomicBoolean getIsActive() {
        return this.isActive;
    }
}
