package ru.inovus.messaging.mq.support.kafka;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import ru.inovus.messaging.channel.api.queue.MqConsumer;
import ru.inovus.messaging.channel.api.queue.MqProvider;
import ru.inovus.messaging.channel.api.queue.TopicMqConsumer;

/* loaded from: input_file:ru/inovus/messaging/mq/support/kafka/KafkaMqProvider.class */
public class KafkaMqProvider implements MqProvider {
    private Map<Serializable, MessageListenerContainer> containers = new ConcurrentHashMap();
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final KafkaProperties properties;

    public KafkaMqProvider(KafkaTemplate<String, Object> kafkaTemplate, KafkaProperties kafkaProperties) {
        this.kafkaTemplate = kafkaTemplate;
        this.properties = kafkaProperties;
    }

    public void subscribe(MqConsumer mqConsumer) {
        ContainerProperties containerProperties = new ContainerProperties(new String[]{mqConsumer.mqName()});
        containerProperties.setMessageListener(consumerRecord -> {
            mqConsumer.messageHandler().accept(consumerRecord.value());
        });
        this.containers.put(mqConsumer.subscriber(), createContainer(getConsumerConfigs(mqConsumer), containerProperties));
    }

    public void publish(Object obj, String str) {
        this.kafkaTemplate.send(str, String.valueOf(System.currentTimeMillis()), obj);
    }

    public void unsubscribe(Serializable serializable) {
        MessageListenerContainer remove = this.containers.remove(serializable);
        if (remove != null) {
            remove.stop();
        }
    }

    private MessageListenerContainer createContainer(Map<String, Object> map, ContainerProperties containerProperties) {
        KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(new DefaultKafkaConsumerFactory(map), containerProperties);
        kafkaMessageListenerContainer.start();
        return kafkaMessageListenerContainer;
    }

    private Map<String, Object> getConsumerConfigs(MqConsumer mqConsumer) {
        Map<String, Object> buildConsumerProperties = this.properties.buildConsumerProperties();
        buildConsumerProperties.put("key.deserializer", StringDeserializer.class);
        buildConsumerProperties.put("value.deserializer", ObjectSerializer.class);
        buildConsumerProperties.put("auto.offset.reset", "earliest");
        String mqName = mqConsumer.mqName();
        if (mqConsumer instanceof TopicMqConsumer) {
            mqName = mqName + "." + ((TopicMqConsumer) mqConsumer).tenantCode + "." + ((TopicMqConsumer) mqConsumer).authToken;
        }
        buildConsumerProperties.put("group.id", mqName);
        return buildConsumerProperties;
    }
}
