package me.ahoo.eventbus.kafka;

import java.util.concurrent.ConcurrentHashMap;
import me.ahoo.eventbus.core.consistency.ConsistencySubscriber;
import me.ahoo.eventbus.core.consistency.ConsistencySubscriberFactory;
import me.ahoo.eventbus.core.subscriber.Subscriber;
import me.ahoo.eventbus.core.subscriber.SubscriberRegistry;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;

/* loaded from: input_file:me/ahoo/eventbus/kafka/KafkaSubscriberRegistry.class */
public class KafkaSubscriberRegistry implements SubscriberRegistry {
    private final KafkaEventCodec kafkaEventCodec;
    private final ConcurrentHashMap<String, Subscriber> nameMapSubscribers = new ConcurrentHashMap<>();
    private final MessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
    private final ConsistencySubscriberFactory subscriberFactory;
    private final KafkaListenerEndpointRegistry listenerEndpointRegistry;
    private final KafkaListenerContainerFactory listenerContainerFactory;

    public KafkaSubscriberRegistry(KafkaEventCodec kafkaEventCodec, ConsistencySubscriberFactory consistencySubscriberFactory, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, KafkaListenerContainerFactory kafkaListenerContainerFactory) {
        this.kafkaEventCodec = kafkaEventCodec;
        this.subscriberFactory = consistencySubscriberFactory;
        this.listenerEndpointRegistry = kafkaListenerEndpointRegistry;
        this.listenerContainerFactory = kafkaListenerContainerFactory;
    }

    public void subscribe(Subscriber subscriber) {
        ConsistencySubscriber create = this.subscriberFactory.create(subscriber);
        registerListener(create);
        registerSubscriber(create);
    }

    private void registerListener(Subscriber subscriber) {
        MethodKafkaListenerAdapter methodKafkaListenerAdapter = new MethodKafkaListenerAdapter(this.kafkaEventCodec, subscriber);
        MethodKafkaListenerEndpoint methodKafkaListenerEndpoint = new MethodKafkaListenerEndpoint();
        methodKafkaListenerEndpoint.setBean(methodKafkaListenerAdapter);
        methodKafkaListenerEndpoint.setMethod(MethodKafkaListenerAdapter.getInvokeMethod());
        methodKafkaListenerEndpoint.setId(subscriber.getName());
        methodKafkaListenerEndpoint.setGroupId(subscriber.getName());
        methodKafkaListenerEndpoint.setTopics(new String[]{subscriber.getSubscribeEventName()});
        methodKafkaListenerEndpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        this.listenerEndpointRegistry.registerListenerContainer(methodKafkaListenerEndpoint, this.listenerContainerFactory, true);
    }

    private void registerSubscriber(Subscriber subscriber) {
        this.nameMapSubscribers.put(subscriber.getName(), subscriber);
    }

    public Subscriber getSubscriber(String str) {
        return this.nameMapSubscribers.get(str);
    }
}
