package me.ehp246.aufkafka.core.consumer;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import me.ehp246.aufkafka.api.consumer.InboundConsumerExecutorProvider;
import me.ehp246.aufkafka.api.consumer.InboundConsumerListener;
import me.ehp246.aufkafka.api.consumer.InboundEndpoint;
import me.ehp246.aufkafka.api.consumer.InvocableBinder;
import me.ehp246.aufkafka.api.consumer.LoggingDispatchingListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;

/* loaded from: input_file:me/ehp246/aufkafka/core/consumer/InboundEndpointConsumerConfigurer.class */
public final class InboundEndpointConsumerConfigurer implements SmartInitializingSingleton, AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(InboundEndpointConsumerConfigurer.class);
    private final List<InboundEndpoint> endpoints;
    private final InboundConsumerExecutorProvider executorProvider;
    private final InvocableBinder binder;
    private final ConsumerProvider consumerProvider;
    private final List<InboundConsumerListener.DispatchingListener> onDispatching;
    private final AutowireCapableBeanFactory autowireCapableBeanFactory;
    private final DefaultInboundConsumerRegistry consumerRegistry;

    public InboundEndpointConsumerConfigurer(List<InboundEndpoint> list, InboundConsumerExecutorProvider inboundConsumerExecutorProvider, ConsumerProvider consumerProvider, InvocableBinder invocableBinder, LoggingDispatchingListener loggingDispatchingListener, AutowireCapableBeanFactory autowireCapableBeanFactory, DefaultInboundConsumerRegistry defaultInboundConsumerRegistry) {
        this.endpoints = list;
        this.executorProvider = inboundConsumerExecutorProvider;
        this.consumerProvider = consumerProvider;
        this.binder = invocableBinder;
        this.onDispatching = loggingDispatchingListener == null ? List.of() : List.of(loggingDispatchingListener);
        this.autowireCapableBeanFactory = autowireCapableBeanFactory;
        this.consumerRegistry = defaultInboundConsumerRegistry;
    }

    public void afterSingletonsInstantiated() {
        for (InboundEndpoint inboundEndpoint : this.endpoints) {
            LoggingEventBuilder message = LOGGER.atTrace().setMessage("Registering '{}' on '{}'");
            Objects.requireNonNull(inboundEndpoint);
            message.addArgument(inboundEndpoint::name).addArgument(() -> {
                return inboundEndpoint.from().topic();
            }).log();
            Consumer<String, String> consumer = this.consumerProvider.get(inboundEndpoint.consumerConfigName(), inboundEndpoint.consumerProperties());
            consumer.subscribe(Set.of(inboundEndpoint.from().topic()));
            InboundConsumerRunner inboundConsumerRunner = new InboundConsumerRunner(consumer, new DefaultInvocableDispatcher(this.binder, inboundEndpoint.invocationListener() == null ? null : List.of(inboundEndpoint.invocationListener()), null), new AutowireCapableInvocableFactory(this.autowireCapableBeanFactory, inboundEndpoint.keyRegistry()), this.onDispatching, inboundEndpoint.unmatchedConsumer(), inboundEndpoint.consumerExceptionListener());
            this.consumerRegistry.put(inboundEndpoint.name(), inboundConsumerRunner);
            this.executorProvider.get().execute(inboundConsumerRunner);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.consumerRegistry.getNames().forEach(str -> {
            this.consumerRegistry.remove(str).consumer().close();
        });
    }
}
