package io.goodforgod.testcontainers.extensions.kafka;

import io.goodforgod.testcontainers.extensions.AbstractTestcontainersExtension;
import io.goodforgod.testcontainers.extensions.ContainerMode;
import io.goodforgod.testcontainers.extensions.ExtensionContainer;
import io.goodforgod.testcontainers.extensions.kafka.ContainerKafkaConnection;
import io.goodforgod.testcontainers.extensions.kafka.Topics;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.platform.commons.util.ReflectionUtils;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

@ApiStatus.Internal
/* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/TestcontainersKafkaExtension.class */
final class TestcontainersKafkaExtension extends AbstractTestcontainersExtension<KafkaConnection, KafkaContainer, KafkaMetadata> {
    private static final String EXTERNAL_TEST_KAFKA_BOOTSTRAP = "EXTERNAL_TEST_KAFKA_BOOTSTRAP_SERVERS";
    private static final String EXTERNAL_TEST_KAFKA_PREFIX = "EXTERNAL_TEST_KAFKA_";
    private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(new Object[]{TestcontainersKafkaExtension.class});

    TestcontainersKafkaExtension() {
    }

    protected Class<? extends Annotation> getContainerAnnotation() {
        return ContainerKafka.class;
    }

    protected Class<? extends Annotation> getConnectionAnnotation() {
        return ContainerKafkaConnection.class;
    }

    protected Class<KafkaConnection> getConnectionType() {
        return KafkaConnection.class;
    }

    protected Class<KafkaContainer> getContainerType() {
        return KafkaContainer.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaContainer getContainerDefault(KafkaMetadata kafkaMetadata) {
        KafkaContainer withStartupTimeout = new KafkaContainer(DockerImageName.parse(kafkaMetadata.image()).asCompatibleSubstituteFor(DockerImageName.parse("confluentinc/cp-kafka"))).withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(KafkaContainer.class)).withMdc("image", kafkaMetadata.image()).withMdc("alias", kafkaMetadata.networkAliasOrDefault())).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false").withEnv("AUTO_CREATE_TOPICS", "true").withEnv("KAFKA_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR,kafka=ERROR,kafka.network=ERROR,kafka.cluster=ERROR,kafka.controller=ERROR,kafka.coordinator=INFO,kafka.log=ERROR,kafka.server=ERROR,state.change.logger=ERROR").withEnv("ZOOKEEPER_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.kafka.zookeeper=ERROR,org.kafka.zookeeper.server=ERROR,kafka.zookeeper=ERROR,org.apache.kafka=ERROR").withEmbeddedZookeeper().withExposedPorts(new Integer[]{9092, 9093}).waitingFor(Wait.forListeningPort()).withStartupTimeout(Duration.ofMinutes(5L));
        withStartupTimeout.setNetworkAliases(new ArrayList(List.of(kafkaMetadata.networkAliasOrDefault())));
        if (kafkaMetadata.networkShared()) {
            withStartupTimeout.withNetwork(Network.SHARED);
        }
        return withStartupTimeout;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaConnection getConnectionForContainer(KafkaMetadata kafkaMetadata, KafkaContainer kafkaContainer) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        return new KafkaConnectionImpl(properties, (Properties) kafkaContainer.getNetworkAliases().stream().filter(str -> {
            return str.equals(kafkaMetadata.networkAliasOrDefault());
        }).findFirst().or(() -> {
            return kafkaContainer.getNetworkAliases().stream().findFirst();
        }).map(str2 -> {
            Properties properties2 = new Properties();
            properties2.put("bootstrap.servers", String.format("%s:%s", str2, "9092"));
            return properties2;
        }).orElse(null));
    }

    protected ExtensionContext.Namespace getNamespace() {
        return NAMESPACE;
    }

    @NotNull
    protected Optional<KafkaMetadata> findMetadata(@NotNull ExtensionContext extensionContext) {
        return findAnnotation(TestcontainersKafka.class, extensionContext).map(testcontainersKafka -> {
            return new KafkaMetadata(testcontainersKafka.network().shared(), testcontainersKafka.network().alias(), testcontainersKafka.image(), testcontainersKafka.mode(), Set.of((Object[]) testcontainersKafka.topics().value()), testcontainersKafka.topics().reset());
        });
    }

    @NotNull
    protected Optional<KafkaConnection> getConnectionExternal() {
        if (System.getenv(EXTERNAL_TEST_KAFKA_BOOTSTRAP) == null) {
            return Optional.empty();
        }
        Properties properties = new Properties();
        System.getenv().forEach((str, str2) -> {
            if (str.startsWith(EXTERNAL_TEST_KAFKA_PREFIX)) {
                properties.put(str.replace(EXTERNAL_TEST_KAFKA_PREFIX, "").replace("_", ".").toLowerCase(), str2);
            }
        });
        return Optional.of(new KafkaConnectionImpl(properties, null));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void injectConnection(KafkaConnection kafkaConnection, ExtensionContext extensionContext) {
        Class<? extends Annotation> connectionAnnotation = getConnectionAnnotation();
        List findFields = ReflectionUtils.findFields(extensionContext.getRequiredTestClass(), field -> {
            return (field.isSynthetic() || Modifier.isFinal(field.getModifiers()) || Modifier.isStatic(field.getModifiers()) || field.getAnnotation(connectionAnnotation) == null) ? false : true;
        }, ReflectionUtils.HierarchyTraversalMode.TOP_DOWN);
        this.logger.debug("Starting @ContainerKafkaConnection field injection for container properties: {}", kafkaConnection);
        KafkaExtensionContainer kafkaExtensionContainer = (KafkaExtensionContainer) getStorage(extensionContext).get(getMetadata(extensionContext).runMode(), KafkaExtensionContainer.class);
        extensionContext.getTestInstance().ifPresent(obj -> {
            Properties properties;
            KafkaConnectionImpl kafkaConnectionImpl;
            Iterator it = findFields.iterator();
            while (it.hasNext()) {
                Field field2 = (Field) it.next();
                try {
                    ContainerKafkaConnection containerKafkaConnection = (ContainerKafkaConnection) field2.getAnnotation(ContainerKafkaConnection.class);
                    if (containerKafkaConnection.properties().length == 0) {
                        kafkaConnectionImpl = (KafkaConnectionImpl) kafkaConnection;
                    } else {
                        Properties properties2 = new Properties();
                        properties2.putAll(kafkaConnection.params().properties());
                        Arrays.stream(containerKafkaConnection.properties()).forEach(property -> {
                            properties2.put(property.name(), property.value());
                        });
                        if (kafkaConnection.paramsInNetwork().isEmpty()) {
                            properties = null;
                        } else {
                            properties = new Properties();
                            properties.putAll(kafkaConnection.paramsInNetwork().get().properties());
                            Arrays.stream(containerKafkaConnection.properties()).forEach(property2 -> {
                                properties.put(property2.name(), property2.value());
                            });
                        }
                        kafkaConnectionImpl = new KafkaConnectionImpl(properties2, properties);
                        kafkaExtensionContainer.pool().add(kafkaConnectionImpl);
                    }
                    field2.setAccessible(true);
                    field2.set(obj, kafkaConnectionImpl);
                } catch (IllegalAccessException e) {
                    throw new IllegalStateException(String.format("Field '%s' annotated with @%s can't set kafka connection", field2.getName(), connectionAnnotation.getSimpleName()), e);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ExtensionContainer<KafkaContainer, KafkaConnection> getExtensionContainer(KafkaContainer kafkaContainer, KafkaConnection kafkaConnection) {
        return new KafkaExtensionContainer(kafkaContainer, kafkaConnection);
    }

    public void beforeAll(ExtensionContext extensionContext) {
        super.beforeAll(extensionContext);
        KafkaMetadata metadata = getMetadata(extensionContext);
        if (metadata.topics().isEmpty()) {
            return;
        }
        KafkaConnection kafkaConnection = (KafkaConnection) getConnectionCurrent(extensionContext);
        if (metadata.runMode() == ContainerMode.PER_RUN) {
            KafkaConnectionImpl.createTopicsIfNeeded(kafkaConnection, metadata.topics(), metadata.reset() == Topics.Mode.PER_CLASS);
        } else if (metadata.runMode() == ContainerMode.PER_CLASS) {
            KafkaConnectionImpl.createTopicsIfNeeded(kafkaConnection, metadata.topics(), false);
        }
    }

    public void beforeEach(ExtensionContext extensionContext) {
        super.beforeEach(extensionContext);
        KafkaMetadata metadata = getMetadata(extensionContext);
        if (metadata.topics().isEmpty()) {
            return;
        }
        KafkaConnection kafkaConnection = (KafkaConnection) getConnectionCurrent(extensionContext);
        if (metadata.runMode() == ContainerMode.PER_METHOD) {
            KafkaConnectionImpl.createTopicsIfNeeded(kafkaConnection, metadata.topics(), false);
        } else if (metadata.reset() == Topics.Mode.PER_METHOD) {
            KafkaConnectionImpl.createTopicsIfNeeded(kafkaConnection, metadata.topics(), true);
        }
    }

    public void afterEach(ExtensionContext extensionContext) {
        KafkaMetadata metadata = getMetadata(extensionContext);
        KafkaExtensionContainer kafkaExtensionContainer = (KafkaExtensionContainer) getStorage(extensionContext).get(metadata.runMode(), KafkaExtensionContainer.class);
        if (metadata.runMode() != ContainerMode.PER_METHOD) {
            kafkaExtensionContainer.pool().clear();
        }
        super.afterEach(extensionContext);
    }

    public void afterAll(ExtensionContext extensionContext) {
        super.afterAll(extensionContext);
    }

    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
        KafkaConnection kafkaConnection = (KafkaConnection) super.resolveParameter(parameterContext, extensionContext);
        if (kafkaConnection == null) {
            return null;
        }
        Properties properties = kafkaConnection.params().properties();
        Properties properties2 = kafkaConnection.paramsInNetwork().isEmpty() ? null : kafkaConnection.paramsInNetwork().get().properties();
        ContainerKafkaConnection containerKafkaConnection = (ContainerKafkaConnection) parameterContext.getParameter().getAnnotation(ContainerKafkaConnection.class);
        if (containerKafkaConnection.properties().length == 0) {
            return kafkaConnection;
        }
        for (ContainerKafkaConnection.Property property : containerKafkaConnection.properties()) {
            properties.put(property.name(), property.value());
            if (properties2 != null) {
                properties2.put(property.name(), property.value());
            }
        }
        KafkaExtensionContainer kafkaExtensionContainer = (KafkaExtensionContainer) getStorage(extensionContext).get(getMetadata(extensionContext).runMode(), KafkaExtensionContainer.class);
        KafkaConnectionImpl kafkaConnectionImpl = new KafkaConnectionImpl(properties, properties2);
        kafkaExtensionContainer.pool().add(kafkaConnectionImpl);
        return kafkaConnectionImpl;
    }
}
