package org.sdase.commons.server.kafka;

import io.dropwizard.core.Configuration;
import io.dropwizard.core.ConfiguredBundle;
import io.dropwizard.core.setup.Bootstrap;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Metrics;
import jakarta.validation.constraints.NotNull;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.sdase.commons.server.dropwizard.lifecycle.ManagedShutdownListener;
import org.sdase.commons.server.dropwizard.metadata.MetadataContext;
import org.sdase.commons.server.kafka.builder.MessageListenerRegistration;
import org.sdase.commons.server.kafka.builder.ProducerRegistration;
import org.sdase.commons.server.kafka.config.ConsumerConfig;
import org.sdase.commons.server.kafka.config.ListenerConfig;
import org.sdase.commons.server.kafka.config.ProducerConfig;
import org.sdase.commons.server.kafka.config.TopicConfig;
import org.sdase.commons.server.kafka.consumer.MessageListener;
import org.sdase.commons.server.kafka.exception.ConfigurationException;
import org.sdase.commons.server.kafka.health.ExternalKafkaHealthCheck;
import org.sdase.commons.server.kafka.health.KafkaHealthCheck;
import org.sdase.commons.server.kafka.producer.KafkaMessageProducer;
import org.sdase.commons.server.kafka.producer.MessageProducer;
import org.sdase.commons.server.kafka.producer.MetadataContextAwareKafkaProducer;
import org.sdase.commons.server.kafka.producer.TraceTokenAwareKafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/sdase/commons/server/kafka/KafkaBundle.class */
public class KafkaBundle<C extends Configuration> implements ConfiguredBundle<C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBundle.class);
    public static final String HEALTHCHECK_NAME = "kafkaConnection";
    public static final String EXTERNAL_HEALTHCHECK_NAME = "kafkaConnectionExternal";
    private final Function<C, KafkaConfiguration> configurationProvider;
    private KafkaConfiguration kafkaConfiguration;
    private final boolean healthCheckDisabled;
    private final String healthCheckName;
    private final List<MessageListener<?, ?>> messageListeners = new ArrayList();
    private final List<ThreadedMessageListener<?, ?>> threadedMessageListeners = new ArrayList();
    private final Map<String, KafkaMessageProducer<?, ?>> messageProducers = new HashMap();
    private KafkaHealthCheck kafkaHealthCheck;
    private MicrometerProducerListener micrometerProducerListener;
    private MicrometerConsumerListener micrometerConsumerListener;

    /* loaded from: input_file:org/sdase/commons/server/kafka/KafkaBundle$Builder.class */
    public static class Builder<T extends Configuration> implements InitialBuilder, FinalBuilder<T> {
        private KafkaConfigurationProvider<T> configurationProvider;
        private boolean healthCheckDisabled = false;
        private String healthCheckName = null;

        private Builder() {
        }

        private Builder(KafkaConfigurationProvider<T> kafkaConfigurationProvider) {
            this.configurationProvider = kafkaConfigurationProvider;
        }

        @Override // org.sdase.commons.server.kafka.KafkaBundle.FinalBuilder
        public FinalBuilder<T> withoutHealthCheck() {
            this.healthCheckDisabled = true;
            return this;
        }

        @Override // org.sdase.commons.server.kafka.KafkaBundle.FinalBuilder
        public FinalBuilder<T> withHealthCheckName(String str) {
            this.healthCheckName = str;
            return this;
        }

        @Override // org.sdase.commons.server.kafka.KafkaBundle.FinalBuilder
        public KafkaBundle<T> build() {
            return new KafkaBundle<>(this.configurationProvider, this.healthCheckDisabled, this.healthCheckName);
        }

        @Override // org.sdase.commons.server.kafka.KafkaBundle.InitialBuilder
        public <C extends Configuration> FinalBuilder<C> withConfigurationProvider(KafkaConfigurationProvider<C> kafkaConfigurationProvider) {
            return new Builder(kafkaConfigurationProvider);
        }
    }

    /* loaded from: input_file:org/sdase/commons/server/kafka/KafkaBundle$FinalBuilder.class */
    public interface FinalBuilder<T extends Configuration> {
        FinalBuilder<T> withoutHealthCheck();

        FinalBuilder<T> withHealthCheckName(String str);

        KafkaBundle<T> build();
    }

    /* loaded from: input_file:org/sdase/commons/server/kafka/KafkaBundle$InitialBuilder.class */
    public interface InitialBuilder {
        <C extends Configuration> FinalBuilder<C> withConfigurationProvider(@NotNull KafkaConfigurationProvider<C> kafkaConfigurationProvider);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener.class */
    public static final class ThreadedMessageListener<K, V> extends Record {
        private final MessageListener<K, V> messageListener;
        private final Thread thread;
        private final String clientId;

        private ThreadedMessageListener(MessageListener<K, V> messageListener, Thread thread, String str) {
            this.messageListener = messageListener;
            this.thread = thread;
            this.clientId = str;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ThreadedMessageListener.class), ThreadedMessageListener.class, "messageListener;thread;clientId", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->messageListener:Lorg/sdase/commons/server/kafka/consumer/MessageListener;", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->thread:Ljava/lang/Thread;", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->clientId:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ThreadedMessageListener.class), ThreadedMessageListener.class, "messageListener;thread;clientId", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->messageListener:Lorg/sdase/commons/server/kafka/consumer/MessageListener;", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->thread:Ljava/lang/Thread;", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->clientId:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ThreadedMessageListener.class, Object.class), ThreadedMessageListener.class, "messageListener;thread;clientId", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->messageListener:Lorg/sdase/commons/server/kafka/consumer/MessageListener;", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->thread:Ljava/lang/Thread;", "FIELD:Lorg/sdase/commons/server/kafka/KafkaBundle$ThreadedMessageListener;->clientId:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public MessageListener<K, V> messageListener() {
            return this.messageListener;
        }

        public Thread thread() {
            return this.thread;
        }

        public String clientId() {
            return this.clientId;
        }
    }

    private KafkaBundle(KafkaConfigurationProvider<C> kafkaConfigurationProvider, boolean z, String str) {
        this.configurationProvider = kafkaConfigurationProvider;
        this.healthCheckDisabled = z;
        this.healthCheckName = str;
    }

    public static InitialBuilder builder() {
        return new Builder();
    }

    public void initialize(Bootstrap<?> bootstrap) {
    }

    public void run(C c, Environment environment) {
        this.kafkaConfiguration = this.configurationProvider.apply(c);
        if (!this.kafkaConfiguration.isDisabled()) {
            if (this.healthCheckDisabled) {
                this.kafkaHealthCheck = new ExternalKafkaHealthCheck(this.kafkaConfiguration);
                environment.healthChecks().register(this.healthCheckName != null ? this.healthCheckName : EXTERNAL_HEALTHCHECK_NAME, this.kafkaHealthCheck);
            } else {
                this.kafkaHealthCheck = new KafkaHealthCheck(this.kafkaConfiguration);
                environment.healthChecks().register(this.healthCheckName != null ? this.healthCheckName : HEALTHCHECK_NAME, this.kafkaHealthCheck);
            }
        }
        this.micrometerProducerListener = new MicrometerProducerListener(Metrics.globalRegistry);
        this.micrometerConsumerListener = new MicrometerConsumerListener(Metrics.globalRegistry);
        setupManagedThreadManager(environment);
    }

    public TopicConfig getTopicConfiguration(String str) throws ConfigurationException {
        if (this.kafkaConfiguration.getTopics().get(str) == null) {
            throw new ConfigurationException(String.format("Topic with key '%s' seems not to be part of the read configuration. Please check the name and configuration.", str));
        }
        return this.kafkaConfiguration.getTopics().get(str);
    }

    public <K, V> List<MessageListener<K, V>> createMessageListener(MessageListenerRegistration<K, V> messageListenerRegistration) {
        if (this.kafkaConfiguration.isDisabled()) {
            return Collections.emptyList();
        }
        checkInit();
        ListenerConfig listenerConfig = messageListenerRegistration.getListenerConfig();
        if (listenerConfig == null && messageListenerRegistration.getListenerConfigName() != null) {
            listenerConfig = this.kafkaConfiguration.getListenerConfig().get(messageListenerRegistration.getListenerConfigName());
            if (listenerConfig == null) {
                throw new ConfigurationException(String.format("Listener config with name '%s' cannot be found within the current configuration.", messageListenerRegistration.getListenerConfigName()));
            }
        }
        if (listenerConfig == null) {
            throw new ConfigurationException("No valid listener config given within the MessageHandlerRegistration");
        }
        if (messageListenerRegistration.getStrategy() == null) {
            throw new IllegalStateException("A strategy is mandatory for message listeners.");
        }
        ArrayList arrayList = new ArrayList(listenerConfig.getInstances());
        for (int i = 0; i < listenerConfig.getInstances(); i++) {
            messageListenerRegistration.getStrategy().init(MetadataContext.metadataFields());
            MessageListener messageListener = new MessageListener(messageListenerRegistration.getTopicsNames(), createConsumer(messageListenerRegistration, i), listenerConfig, messageListenerRegistration.getStrategy());
            arrayList.add(messageListener);
            Thread thread = new Thread(messageListener);
            thread.start();
            this.threadedMessageListeners.add(new ThreadedMessageListener<>(messageListener, thread, (String) messageListener.getConsumer().metrics().entrySet().stream().findFirst().map(entry -> {
                return (String) ((MetricName) entry.getKey()).tags().get("client-id");
            }).orElse("")));
        }
        this.messageListeners.addAll(arrayList);
        return arrayList;
    }

    public <K, V> MessageProducer<K, V> registerProducer(ProducerRegistration<K, V> producerRegistration) throws ConfigurationException {
        if (this.kafkaConfiguration.isDisabled()) {
            return (obj, obj2, headers, callback) -> {
                return null;
            };
        }
        checkInit();
        Producer<K, V> createProducer = createProducer(producerRegistration);
        Map.Entry<K, V> orElse = createProducer.metrics().entrySet().stream().findFirst().orElse(null);
        String str = orElse != null ? (String) ((MetricName) orElse.getKey()).tags().get("client-id") : "";
        KafkaMessageProducer<?, ?> kafkaMessageProducer = new KafkaMessageProducer<>(producerRegistration.getTopic().getName(), createProducer);
        if (this.micrometerProducerListener == null) {
            LOGGER.warn("MicrometerProducerListener is not initialized! Metrics will not be recorded.");
        } else {
            this.micrometerProducerListener.producerAdded(str, createProducer);
        }
        this.messageProducers.put(str, kafkaMessageProducer);
        return kafkaMessageProducer;
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2, String str) {
        return createConsumer(deserializer, deserializer2, getConsumerConfiguration(str), 0);
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2, ConsumerConfig consumerConfig, int i) {
        KafkaProperties forConsumer = KafkaProperties.forConsumer(this.kafkaConfiguration);
        if (consumerConfig != null) {
            forConsumer.putAll(consumerConfig.getConfig());
            forConsumer.put("client.id", consumerConfig.getClientId() + "-" + i);
        }
        Consumer<K, V> kafkaConsumer = new KafkaConsumer<>(forConsumer, deserializer, deserializer2);
        if (this.micrometerConsumerListener == null) {
            LOGGER.warn("MicrometerConsumerListener is not initialized! Metrics will not be recorded.");
        } else {
            this.micrometerConsumerListener.consumerAdded(forConsumer.getProperty("client.id"), kafkaConsumer);
        }
        return kafkaConsumer;
    }

    public <K, V> Producer<K, V> createProducer(Serializer<K> serializer, Serializer<V> serializer2, ProducerConfig producerConfig) {
        KafkaProperties forProducer = KafkaProperties.forProducer(this.kafkaConfiguration);
        if (producerConfig != null) {
            forProducer.putAll(producerConfig.getConfig());
        }
        MetadataContextAwareKafkaProducer kafkaProducer = new KafkaProducer(forProducer, serializer, serializer2);
        if (!MetadataContext.metadataFields().isEmpty()) {
            kafkaProducer = new MetadataContextAwareKafkaProducer(kafkaProducer, MetadataContext.metadataFields());
        }
        return new TraceTokenAwareKafkaProducer(kafkaProducer);
    }

    public <K, V> Producer<K, V> createProducer(Serializer<K> serializer, Serializer<V> serializer2, String str) {
        ProducerConfig producerConfiguration = getProducerConfiguration(str);
        if (producerConfiguration != null && producerConfiguration.getClientId() == null) {
            producerConfiguration.setClientId(str);
        }
        return createProducer(serializer, serializer2, producerConfiguration);
    }

    private <K, V> Producer<K, V> createProducer(ProducerRegistration<K, V> producerRegistration) {
        ProducerConfig producerConfig = producerRegistration.getProducerConfig();
        if (producerConfig == null && producerRegistration.getProducerConfigName() != null) {
            producerConfig = getProducerConfiguration(producerRegistration.getProducerConfigName());
        }
        if (producerConfig != null && producerConfig.getClientId() == null) {
            producerConfig.setClientId(producerRegistration.getProducerConfigName());
        }
        return createProducer(producerRegistration.getKeySerializer(), producerRegistration.getValueSerializer(), producerConfig);
    }

    public ConsumerConfig getConsumerConfiguration(String str) {
        if (this.kafkaConfiguration.getConsumers().containsKey(str)) {
            return this.kafkaConfiguration.getConsumers().get(str);
        }
        throw new ConfigurationException(String.format("Consumer config with name '%s' cannot be found within the current configuration.", str));
    }

    public ProducerConfig getProducerConfiguration(String str) {
        if (this.kafkaConfiguration.getProducers().containsKey(str)) {
            return this.kafkaConfiguration.getProducers().get(str);
        }
        throw new ConfigurationException(String.format("Producer config with name '%s' cannot be found within the current configuration.", str));
    }

    private <K, V> KafkaConsumer<K, V> createConsumer(MessageListenerRegistration<K, V> messageListenerRegistration, int i) {
        ConsumerConfig consumerConfig = messageListenerRegistration.getConsumerConfig();
        if (consumerConfig == null && messageListenerRegistration.getConsumerConfigName() != null) {
            consumerConfig = getConsumerConfiguration(messageListenerRegistration.getConsumerConfigName());
        }
        if (consumerConfig != null) {
            applyForcedConfigFromStrategy(messageListenerRegistration, consumerConfig);
            messageListenerRegistration.getStrategy().verifyConsumerConfig(consumerConfig.getConfig());
        }
        if (consumerConfig != null && consumerConfig.getClientId() == null) {
            consumerConfig.setClientId(messageListenerRegistration.getConsumerConfigName());
        }
        return createConsumer(messageListenerRegistration.getKeyDeserializer(), messageListenerRegistration.getValueDeserializer(), consumerConfig, i);
    }

    private void applyForcedConfigFromStrategy(MessageListenerRegistration<?, ?> messageListenerRegistration, ConsumerConfig consumerConfig) {
        HashMap hashMap = new HashMap(consumerConfig.getConfig());
        for (Map.Entry<String, String> entry : messageListenerRegistration.getStrategy().forcedConfigToApply().entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            String str = hashMap.get(key);
            if (str == null) {
                LOGGER.info("Setting in consumer config: '{}'='{}' (forced from strategy {})", new Object[]{key, value, messageListenerRegistration.getStrategy().getClass().getSimpleName()});
            } else if (!value.equals(str)) {
                LOGGER.warn("Overwriting in consumer config: '{}'='{}' with new value '{}' (forced from strategy {})", new Object[]{key, str, value, messageListenerRegistration.getStrategy().getClass().getSimpleName()});
            }
            hashMap.put(key, value);
        }
        consumerConfig.setConfig(hashMap);
    }

    private void checkInit() {
        if (this.kafkaConfiguration == null || this.micrometerProducerListener == null || this.micrometerConsumerListener == null) {
            throw new IllegalStateException("KafkaConfiguration not yet initialized!");
        }
    }

    private void setupManagedThreadManager(Environment environment) {
        environment.lifecycle().manage(ManagedShutdownListener.onShutdown(() -> {
            shutdownConsumerThreads();
            stopProducers();
            shutdownKafkaHealthCheck();
        }));
    }

    private void stopProducers() {
        this.messageProducers.forEach((str, kafkaMessageProducer) -> {
            try {
                try {
                    kafkaMessageProducer.close();
                    this.micrometerProducerListener.producerRemoved(str);
                } catch (InterruptException e) {
                    LOGGER.error("Error closing producer", e);
                    Thread.currentThread().interrupt();
                    this.micrometerProducerListener.producerRemoved(str);
                }
            } catch (Throwable th) {
                this.micrometerProducerListener.producerRemoved(str);
                throw th;
            }
        });
    }

    private void shutdownConsumerThreads() {
        this.threadedMessageListeners.forEach(threadedMessageListener -> {
            threadedMessageListener.messageListener.stopConsumer();
        });
        this.threadedMessageListeners.forEach(threadedMessageListener2 -> {
            this.micrometerConsumerListener.consumerRemoved(threadedMessageListener2.clientId);
        });
        this.threadedMessageListeners.forEach(threadedMessageListener3 -> {
            try {
                threadedMessageListener3.thread.join();
            } catch (InterruptedException e) {
                LOGGER.warn("Error while shutting down consumer threads", e);
                Thread.currentThread().interrupt();
            }
        });
    }

    private void shutdownKafkaHealthCheck() {
        if (this.kafkaHealthCheck != null) {
            this.kafkaHealthCheck.shutdown();
        }
    }
}
