package vip.isass.core.mq.kafka011.consumer;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import vip.isass.core.mq.core.SubscribeModel;
import vip.isass.core.mq.core.consumer.EventListener;
import vip.isass.core.mq.core.consumer.MqConsumer;
import vip.isass.core.mq.kafka011.Kafka011Const;
import vip.isass.core.mq.kafka011.config.InstanceConfiguration;
import vip.isass.core.mq.kafka011.config.Kafka011ConfigUtil;
import vip.isass.core.mq.kafka011.config.Kafka011Configuration;
import vip.isass.core.serialization.JacksonSerializable;
import vip.isass.core.support.FunctionUtil;
import vip.isass.core.support.JsonUtil;

@Scope("prototype")
@Component
/* loaded from: input_file:vip/isass/core/mq/kafka011/consumer/Kafka011Consumer.class */
public class Kafka011Consumer implements MqConsumer {
    private static final Logger log = LoggerFactory.getLogger(Kafka011Consumer.class);
    private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
    private EventListener eventListener;
    private SubscribeModel subscribeModel;
    private String region;
    private String instance;
    private String producerId;
    private String consumerId;
    private String topic;
    private String tag;
    private Integer consumeThreadNumber;
    private Consumer consumer;
    private Object runtimeBean;
    private Method runtimeMethod;

    @Resource
    private Kafka011Configuration kafka011Configuration;

    public String getManufacturer() {
        return Kafka011Const.MANUFACTURER;
    }

    public void subscribe() {
        executorService.execute(() -> {
            log.info("开始订阅事件,consumerId[{}]", this.consumerId);
            Assert.notBlank(this.consumerId);
            InstanceConfiguration selectInstance = Kafka011ConfigUtil.selectInstance(this.kafka011Configuration, this.instance);
            this.instance = selectInstance.getInstanceName();
            Properties properties = new Properties();
            properties.put("bootstrap.servers", selectInstance.getServers());
            properties.put("group.id", this.consumerId);
            FunctionUtil.consumeIfNotNull(selectInstance.getEnableAutoCommit(), str -> {
                properties.put("enable.auto.commit", str);
            });
            FunctionUtil.consumeIfNotNull(selectInstance.getAutoCommitIntervalMs(), str2 -> {
                properties.put("auto.commit.interval.ms", str2);
            });
            FunctionUtil.consumeIfNotNull(selectInstance.getAutoOffsetReset(), str3 -> {
                properties.put("auto.offset.reset", str3);
            });
            FunctionUtil.consumeIfNotNull(selectInstance.getSessionTimeoutMs(), str4 -> {
                properties.put("session.timeout.ms", str4);
            });
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            kafkaConsumer.subscribe(Arrays.asList(parseTopic(selectInstance)));
            while (true) {
                Iterator it = kafkaConsumer.poll(100L).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    log.debug("收到mq消息：{}", consumerRecord);
                    try {
                        try {
                            this.runtimeMethod.invoke(this.runtimeBean, genParameter(consumerRecord));
                        } catch (Throwable th) {
                            log.error("mq消费错误：{}", th.getMessage(), th);
                            Throwable unwrap = ExceptionUtil.unwrap(th);
                            log.error("{}", unwrap.getMessage(), unwrap);
                        }
                    } catch (Exception e) {
                        log.error("反序列化mq消息错误，此消息将标记为消费成功：{}，", e.getMessage(), e);
                    }
                }
            }
        });
    }

    private String parseTopic(InstanceConfiguration instanceConfiguration) {
        if (StrUtil.isNotBlank(this.eventListener.topic())) {
            return this.eventListener.topic();
        }
        int messageType = this.eventListener.messageType();
        switch (messageType) {
            case 1:
                return instanceConfiguration.getCommonMessageTopic();
            case 2:
                return instanceConfiguration.getTimingMessageTopic();
            case 3:
                return instanceConfiguration.getTimingMessageTopic();
            case 4:
                throw new UnsupportedOperationException("未支持事务消息");
            case 5:
                return instanceConfiguration.getShardingSequentialMessageTopic();
            case 6:
                return instanceConfiguration.getGlobalSequentialMessageTopic();
            default:
                throw new UnsupportedOperationException("未支持消息类型:" + messageType);
        }
    }

    private Object[] genParameter(ConsumerRecord consumerRecord) {
        Class<?>[] parameterTypes = this.runtimeMethod.getParameterTypes();
        Object[] objArr = null;
        if (parameterTypes.length > 0) {
            objArr = new Object[parameterTypes.length];
            for (int i = 0; i < parameterTypes.length; i++) {
                Class<?> cls = parameterTypes[i];
                String obj = consumerRecord.value().toString();
                if (ConsumerRecord.class.isAssignableFrom(cls)) {
                    objArr[i] = consumerRecord;
                } else if (JacksonSerializable.class.isAssignableFrom(cls)) {
                    Field field = ReflectUtil.getField(cls, "TYPE_REFERENCE");
                    Assert.notNull(field, "[{}]是JacksonSerializable的实现，但没有TYPE_REFERENCE方法", new Object[]{cls});
                    objArr[i] = JsonUtil.DEFAULT_INSTANCE.readValue(obj, (TypeReference) field.get(null));
                } else {
                    objArr[i] = JSONUtil.isJson(obj) ? JsonUtil.DEFAULT_INSTANCE.readValue(obj, cls) : Convert.convert(cls, obj);
                }
            }
        }
        return objArr;
    }

    @PreDestroy
    public void destroy() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    public EventListener getEventListener() {
        return this.eventListener;
    }

    public SubscribeModel getSubscribeModel() {
        return this.subscribeModel;
    }

    public String getRegion() {
        return this.region;
    }

    public String getInstance() {
        return this.instance;
    }

    public String getProducerId() {
        return this.producerId;
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public String getTopic() {
        return this.topic;
    }

    public String getTag() {
        return this.tag;
    }

    public Integer getConsumeThreadNumber() {
        return this.consumeThreadNumber;
    }

    public Consumer getConsumer() {
        return this.consumer;
    }

    public Object getRuntimeBean() {
        return this.runtimeBean;
    }

    public Method getRuntimeMethod() {
        return this.runtimeMethod;
    }

    public Kafka011Configuration getKafka011Configuration() {
        return this.kafka011Configuration;
    }

    /* renamed from: setEventListener, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m8setEventListener(EventListener eventListener) {
        this.eventListener = eventListener;
        return this;
    }

    /* renamed from: setSubscribeModel, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m7setSubscribeModel(SubscribeModel subscribeModel) {
        this.subscribeModel = subscribeModel;
        return this;
    }

    public Kafka011Consumer setRegion(String str) {
        this.region = str;
        return this;
    }

    public Kafka011Consumer setInstance(String str) {
        this.instance = str;
        return this;
    }

    public Kafka011Consumer setProducerId(String str) {
        this.producerId = str;
        return this;
    }

    /* renamed from: setConsumerId, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m6setConsumerId(String str) {
        this.consumerId = str;
        return this;
    }

    /* renamed from: setTopic, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m5setTopic(String str) {
        this.topic = str;
        return this;
    }

    /* renamed from: setTag, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m4setTag(String str) {
        this.tag = str;
        return this;
    }

    /* renamed from: setConsumeThreadNumber, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m3setConsumeThreadNumber(Integer num) {
        this.consumeThreadNumber = num;
        return this;
    }

    public Kafka011Consumer setConsumer(Consumer consumer) {
        this.consumer = consumer;
        return this;
    }

    /* renamed from: setRuntimeBean, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m2setRuntimeBean(Object obj) {
        this.runtimeBean = obj;
        return this;
    }

    /* renamed from: setRuntimeMethod, reason: merged with bridge method [inline-methods] */
    public Kafka011Consumer m1setRuntimeMethod(Method method) {
        this.runtimeMethod = method;
        return this;
    }

    public Kafka011Consumer setKafka011Configuration(Kafka011Configuration kafka011Configuration) {
        this.kafka011Configuration = kafka011Configuration;
        return this;
    }
}
