package com.tvd12.ezymq.kafka;

import com.tvd12.ezyfox.builder.EzyBuilder;
import com.tvd12.ezyfox.util.EzyCloseable;
import com.tvd12.ezyfox.util.EzyLoggable;
import com.tvd12.ezyfox.util.EzyStartable;
import com.tvd12.ezymq.kafka.codec.EzyKafkaDataCodec;
import com.tvd12.ezymq.kafka.endpoint.EzyKafkaServer;
import com.tvd12.ezymq.kafka.handler.EzyKafkaMessageHandlers;
import com.tvd12.ezymq.kafka.handler.EzyKafkaMessageInterceptor;
import com.tvd12.ezymq.kafka.handler.EzyKafkaRecordsHandler;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

/* loaded from: input_file:com/tvd12/ezymq/kafka/EzyKafkaConsumer.class */
public class EzyKafkaConsumer extends EzyLoggable implements EzyKafkaRecordsHandler, EzyStartable, EzyCloseable {
    protected EzyKafkaMessageInterceptor messageInterceptor;
    protected final EzyKafkaServer server;
    protected final EzyKafkaDataCodec dataCodec;
    protected final EzyKafkaMessageHandlers messageHandlers;
    protected static final byte[] BINARY_TYPE = {98};

    /* loaded from: input_file:com/tvd12/ezymq/kafka/EzyKafkaConsumer$Builder.class */
    public static class Builder implements EzyBuilder<EzyKafkaConsumer> {
        protected EzyKafkaServer server;
        protected EzyKafkaDataCodec dataCodec;
        protected EzyKafkaMessageHandlers messageHandlers;
        protected EzyKafkaMessageInterceptor messageInterceptor;

        public Builder server(EzyKafkaServer ezyKafkaServer) {
            this.server = ezyKafkaServer;
            return this;
        }

        public Builder dataCodec(EzyKafkaDataCodec ezyKafkaDataCodec) {
            this.dataCodec = ezyKafkaDataCodec;
            return this;
        }

        public Builder messageHandlers(EzyKafkaMessageHandlers ezyKafkaMessageHandlers) {
            this.messageHandlers = ezyKafkaMessageHandlers;
            return this;
        }

        public Builder messageInterceptor(EzyKafkaMessageInterceptor ezyKafkaMessageInterceptor) {
            this.messageInterceptor = ezyKafkaMessageInterceptor;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public EzyKafkaConsumer m1build() {
            EzyKafkaConsumer ezyKafkaConsumer = new EzyKafkaConsumer(this.server, this.dataCodec, this.messageHandlers);
            ezyKafkaConsumer.setMessageInterceptor(this.messageInterceptor);
            return ezyKafkaConsumer;
        }
    }

    public EzyKafkaConsumer(EzyKafkaServer ezyKafkaServer, EzyKafkaDataCodec ezyKafkaDataCodec, EzyKafkaMessageHandlers ezyKafkaMessageHandlers) {
        this.server = ezyKafkaServer;
        this.server.setRecordsHandler(this);
        this.dataCodec = ezyKafkaDataCodec;
        this.messageHandlers = ezyKafkaMessageHandlers;
    }

    public void start() throws Exception {
        this.server.start();
    }

    public void close() {
        this.server.close();
    }

    @Override // com.tvd12.ezymq.kafka.handler.EzyKafkaRecordsHandler
    public void handleRecord(ConsumerRecord consumerRecord) {
        String str = consumerRecord.topic();
        Object key = consumerRecord.key();
        String str2 = key != null ? new String((byte[]) key) : "";
        Header lastHeader = consumerRecord.headers().lastHeader("c");
        Object obj = null;
        try {
            byte[] bArr = (byte[]) consumerRecord.value();
            obj = lastHeader == null ? this.dataCodec.deserialize(str, str2, bArr) : Arrays.equals(lastHeader.value(), BINARY_TYPE) ? this.dataCodec.deserialize(str, str2, bArr) : this.dataCodec.deserializeText(str, str2, bArr);
            if (this.messageInterceptor != null) {
                this.messageInterceptor.preHandle(str, str2, obj);
            }
            Object handle = this.messageHandlers.handle(str2, obj);
            if (this.messageInterceptor != null) {
                this.messageInterceptor.postHandle(str, str2, obj, handle);
            }
        } catch (Throwable th) {
            if (this.messageInterceptor != null) {
                this.messageInterceptor.postHandle(str, str2, obj, th);
            } else {
                this.logger.warn("handle command: {}, message: {} error", new Object[]{str2, obj, th});
            }
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public void setMessageInterceptor(EzyKafkaMessageInterceptor ezyKafkaMessageInterceptor) {
        this.messageInterceptor = ezyKafkaMessageInterceptor;
    }
}
