package cn.shazhengbo.kafka.event.subscriber;

import cn.shazhengbo.kafka.annotation.EventMessage;
import cn.shazhengbo.kafka.annotation.EventMessageListener;
import cn.shazhengbo.kafka.config.SysConfig;
import cn.shazhengbo.kafka.event.ServiceHelper;
import cn.shazhengbo.kafka.event.listener.EventKafkaEventListener;
import cn.shazhengbo.kafka.message.KafkaEventMessageHandler;
import cn.shazhengbo.kafka.utils.aop.AopTargetUtils;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/shazhengbo/kafka/event/subscriber/EventSubscriber.class */
public class EventSubscriber implements ApplicationListener<ContextRefreshedEvent> {
    private static final Logger log = LoggerFactory.getLogger(EventSubscriber.class);
    private ApplicationContext applicationContext;
    private final SysConfig sysConfig;
    private final ConsumerFactory consumerFactory;
    private final ConcurrentMap<String, ConcurrentMessageListenerContainer<String, String>> consumers = new ConcurrentHashMap();

    @Autowired
    public EventSubscriber(SysConfig sysConfig, ConsumerFactory consumerFactory) {
        this.sysConfig = sysConfig;
        this.consumerFactory = consumerFactory;
    }

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        this.applicationContext = contextRefreshedEvent.getApplicationContext();
        init(contextRefreshedEvent.getApplicationContext().getBeansOfType(KafkaEventMessageHandler.class).values());
    }

    private void init(Collection<KafkaEventMessageHandler> collection) {
        collection.forEach(kafkaEventMessageHandler -> {
            KafkaEventMessageHandler kafkaEventMessageHandler = (KafkaEventMessageHandler) AopTargetUtils.getTarget(kafkaEventMessageHandler);
            log.info(String.format("开始注册消息处理器：%s", kafkaEventMessageHandler.getClass().getName()));
            Type[] genericInterfaces = kafkaEventMessageHandler.getClass().getGenericInterfaces();
            if (genericInterfaces[0].getTypeName().equals(KafkaEventMessageHandler.class.getTypeName())) {
                return;
            }
            Class cls = (Class) ((ParameterizedType) genericInterfaces[0]).getActualTypeArguments()[0];
            subscribe(ServiceHelper.retrieveMessageListener(kafkaEventMessageHandler.getClass()), cls, kafkaEventMessageHandler);
            log.info(String.format("已注册消息【%s】的处理器：%s", cls.getName(), kafkaEventMessageHandler.getClass().getName()));
        });
    }

    public <T> void subscribe(EventMessageListener eventMessageListener, Class<T> cls, KafkaEventMessageHandler<T> kafkaEventMessageHandler) {
        EventMessage retrieveLeopardMessage = ServiceHelper.retrieveLeopardMessage(cls);
        EventKafkaEventListener eventKafkaEventListener = (EventKafkaEventListener) this.applicationContext.getBean(EventKafkaEventListener.class);
        eventKafkaEventListener.setHandler(kafkaEventMessageHandler);
        eventKafkaEventListener.setConsumerGroup(eventMessageListener.group());
        eventKafkaEventListener.setEvent(cls);
        subscribe(eventMessageListener, retrieveLeopardMessage.topic(), eventKafkaEventListener);
    }

    private <T> void subscribe(EventMessageListener eventMessageListener, String str, Object obj) {
        String calculateHashCode = calculateHashCode(eventMessageListener.group(), str);
        String generateTopic = generateTopic(str);
        ConcurrentMessageListenerContainer<String, String> createListenerContainer = ServiceHelper.createListenerContainer(eventMessageListener, generateTopic, this.consumerFactory, obj);
        ConcurrentMessageListenerContainer<String, String> putIfAbsent = this.consumers.putIfAbsent(calculateHashCode, createListenerContainer);
        ConcurrentMessageListenerContainer<String, String> concurrentMessageListenerContainer = putIfAbsent == null ? createListenerContainer : putIfAbsent;
        if (!concurrentMessageListenerContainer.isRunning()) {
            concurrentMessageListenerContainer.start();
        }
        log.info(String.format("已注册 %s 的消费者 %s", generateTopic, eventMessageListener.group()));
    }

    private String generateTopic(String str) {
        return String.format("%s.%s", this.sysConfig.getTopicPrefix(), str);
    }

    private String calculateHashCode(String str, String str2) {
        return String.format("%s.%s", str, str2);
    }
}
