package cn.hippo4j.adapter.kafka;

import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.ReflectUtil;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cglib.core.Constants;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.TopicPartitionOffset;

/* loaded from: input_file:cn/hippo4j/adapter/kafka/KafkaThreadPoolAdapter.class */
public class KafkaThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaThreadPoolAdapter.class);
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public String mark() {
        return "Kafka";
    }

    public ThreadPoolAdapterState getThreadPoolState(String str) {
        ThreadPoolAdapterState threadPoolAdapterState = new ThreadPoolAdapterState();
        ConcurrentMessageListenerContainer listenerContainer = this.kafkaListenerEndpointRegistry.getListenerContainer(str);
        if (listenerContainer == null) {
            log.warn("[{}] Kafka consuming thread pool not found.", str);
            return threadPoolAdapterState;
        }
        threadPoolAdapterState.setThreadPoolKey(str);
        if (listenerContainer instanceof ConcurrentMessageListenerContainer) {
            threadPoolAdapterState.setCoreSize(Integer.valueOf(listenerContainer.getConcurrency()));
            threadPoolAdapterState.setMaximumSize(threadPoolAdapterState.getCoreSize());
        } else {
            threadPoolAdapterState.setCoreSize(1);
            threadPoolAdapterState.setMaximumSize(1);
        }
        return threadPoolAdapterState;
    }

    public List<ThreadPoolAdapterState> getThreadPoolStates() {
        ArrayList arrayList = new ArrayList();
        this.kafkaListenerEndpointRegistry.getListenerContainerIds().forEach(str -> {
            arrayList.add(getThreadPoolState(str));
        });
        return arrayList;
    }

    public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
        String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
        ConcurrentMessageListenerContainer listenerContainer = this.kafkaListenerEndpointRegistry.getListenerContainer(threadPoolKey);
        if (listenerContainer == null) {
            log.warn("[{}] Kafka consuming thread pool not found.", threadPoolKey);
            return false;
        }
        if (!(listenerContainer instanceof ConcurrentMessageListenerContainer)) {
            log.warn("[{}] Kafka consuming thread pool not support modify.", threadPoolKey);
            return false;
        }
        ConcurrentMessageListenerContainer concurrentMessageListenerContainer = listenerContainer;
        int concurrency = concurrentMessageListenerContainer.getConcurrency();
        Integer corePoolSize = threadPoolAdapterParameter.getCorePoolSize();
        if (concurrency >= corePoolSize.intValue()) {
            decreaseConsumer(threadPoolKey, concurrentMessageListenerContainer, concurrency, corePoolSize);
        } else if (!addConsumer(threadPoolKey, concurrentMessageListenerContainer, concurrency, corePoolSize)) {
            return false;
        }
        concurrentMessageListenerContainer.setConcurrency(corePoolSize.intValue());
        log.info("[{}] Kafka consumption thread pool parameter change. coreSize: {}, maximumSize: {}", new Object[]{threadPoolKey, String.format("%s => %s", Integer.valueOf(concurrency), corePoolSize), String.format("%s => %s", Integer.valueOf(concurrency), corePoolSize)});
        return true;
    }

    private static void decreaseConsumer(String str, ConcurrentMessageListenerContainer concurrentMessageListenerContainer, int i, Integer num) {
        int intValue = i - num.intValue();
        Iterator it = ((List) ReflectUtil.getFieldValue(concurrentMessageListenerContainer, "containers")).iterator();
        int i2 = 0;
        while (it.hasNext() && i2 < intValue) {
            KafkaMessageListenerContainer kafkaMessageListenerContainer = (KafkaMessageListenerContainer) it.next();
            if (kafkaMessageListenerContainer.isRunning()) {
                kafkaMessageListenerContainer.stop(() -> {
                });
                i2++;
            }
        }
        log.info("[{}] Kafka consumption change. target decrease {} ,real decrease {}", new Object[]{str, Integer.valueOf(intValue), Integer.valueOf(i2)});
    }

    private static boolean addConsumer(String str, ConcurrentMessageListenerContainer concurrentMessageListenerContainer, int i, Integer num) {
        ContainerProperties containerProperties = concurrentMessageListenerContainer.getContainerProperties();
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && num.intValue() > topicPartitions.length) {
            log.warn("[{}] Kafka consuming thread pool not support modify. When specific partitions are provided, the concurrency must be less than or equal to the number of partitions;", str);
            return false;
        }
        List list = (List) ReflectUtil.getFieldValue(concurrentMessageListenerContainer, "containers");
        boolean booleanValue = ((Boolean) ReflectUtil.getFieldValue(concurrentMessageListenerContainer, "alwaysClientIdSuffix")).booleanValue();
        int size = list.size();
        for (int i2 = size; i2 < (num.intValue() - i) + size; i2++) {
            KafkaMessageListenerContainer kafkaMessageListenerContainer = (KafkaMessageListenerContainer) ReflectUtil.invoke(concurrentMessageListenerContainer, "constructContainer", new Object[]{containerProperties, topicPartitions, Integer.valueOf(i2)});
            String beanName = concurrentMessageListenerContainer.getBeanName();
            kafkaMessageListenerContainer.setBeanName((beanName != null ? beanName : "consumer") + "-" + i2);
            kafkaMessageListenerContainer.setApplicationContext(ApplicationContextHolder.getInstance());
            if (concurrentMessageListenerContainer.getApplicationEventPublisher() != null) {
                kafkaMessageListenerContainer.setApplicationEventPublisher(concurrentMessageListenerContainer.getApplicationEventPublisher());
            }
            kafkaMessageListenerContainer.setClientIdSuffix((num.intValue() > 1 || booleanValue) ? "-" + i2 : "");
            kafkaMessageListenerContainer.setGenericErrorHandler((GenericErrorHandler) ReflectUtil.invoke(concurrentMessageListenerContainer, "getGenericErrorHandler", new Object[0]));
            kafkaMessageListenerContainer.setAfterRollbackProcessor((AfterRollbackProcessor) ReflectUtil.invoke(concurrentMessageListenerContainer, "getAfterRollbackProcessor", new Object[0]));
            Method findDeclaredMethod = ReflectUtil.findDeclaredMethod(concurrentMessageListenerContainer.getClass(), "getRecordInterceptor", Constants.EMPTY_CLASS_ARRAY);
            ReflectUtil.setAccessible(findDeclaredMethod);
            kafkaMessageListenerContainer.setRecordInterceptor((RecordInterceptor) ReflectUtil.invoke(concurrentMessageListenerContainer, findDeclaredMethod, new Object[0]));
            Method findDeclaredMethod2 = ReflectUtil.findDeclaredMethod(concurrentMessageListenerContainer.getClass(), "isInterceptBeforeTx", Constants.EMPTY_CLASS_ARRAY);
            ReflectUtil.setAccessible(findDeclaredMethod2);
            kafkaMessageListenerContainer.setInterceptBeforeTx(((Boolean) ReflectUtil.invoke(concurrentMessageListenerContainer, findDeclaredMethod2, new Object[0])).booleanValue());
            kafkaMessageListenerContainer.setEmergencyStop(() -> {
                concurrentMessageListenerContainer.stop(() -> {
                });
                ReflectUtil.invoke(concurrentMessageListenerContainer, "publishContainerStoppedEvent", new Object[0]);
            });
            Method findDeclaredMethod3 = ReflectUtil.findDeclaredMethod(concurrentMessageListenerContainer.getClass(), "isPaused", Constants.EMPTY_CLASS_ARRAY);
            ReflectUtil.setAccessible(findDeclaredMethod3);
            if (((Boolean) ReflectUtil.invoke(concurrentMessageListenerContainer, findDeclaredMethod3, new Object[0])).booleanValue()) {
                kafkaMessageListenerContainer.pause();
            }
            kafkaMessageListenerContainer.start();
            list.add(kafkaMessageListenerContainer);
        }
        return true;
    }

    public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
        try {
            this.kafkaListenerEndpointRegistry = (KafkaListenerEndpointRegistry) ApplicationContextHolder.getBean(KafkaListenerEndpointRegistry.class);
        } catch (Exception e) {
            log.error("Failed to get Kafka thread pool.", e);
        }
    }
}
