package org.apache.flink.connector.testframe.container;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.slf4j.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/testframe/container/FlinkTestcontainersConfigurator.class */
public class FlinkTestcontainersConfigurator {
    private final TestcontainersSettings testcontainersSettings;
    private final FlinkContainersSettings flinkContainersSettings;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkTestcontainersConfigurator(FlinkContainersSettings flinkContainersSettings, TestcontainersSettings testcontainersSettings) {
        this.testcontainersSettings = testcontainersSettings;
        this.flinkContainersSettings = flinkContainersSettings;
    }

    private GenericContainer<?> configureJobManagerContainer(Path path) {
        Configuration configuration = new Configuration();
        configuration.addAll(this.flinkContainersSettings.getFlinkConfig());
        try {
            return configureContainer(new GenericContainer<>(new FlinkImageBuilder().setTempDirectory(path).setConfiguration(configuration).setLogProperties(this.flinkContainersSettings.getLogProperties()).setBaseImage(this.flinkContainersSettings.getBaseImage()).asJobManager().build()), this.flinkContainersSettings.getJobManagerHostname(), "JobManager").withExposedPorts(new Integer[]{(Integer) configuration.get(RestOptions.PORT)});
        } catch (ImageBuildException e) {
            throw new RuntimeException("Failed to build JobManager image", e);
        }
    }

    private List<GenericContainer<?>> configureTaskManagerContainers(Path path) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.flinkContainersSettings.getNumTaskManagers(); i++) {
            Configuration configuration = new Configuration();
            configuration.addAll(this.flinkContainersSettings.getFlinkConfig());
            String str = this.flinkContainersSettings.getTaskManagerHostnamePrefix() + i;
            configuration.set(TaskManagerOptions.HOST, str);
            try {
                arrayList.add(configureContainer(new GenericContainer<>(new FlinkImageBuilder().setTempDirectory(path).setConfiguration(configuration).setLogProperties(this.flinkContainersSettings.getLogProperties()).setBaseImage(this.flinkContainersSettings.getBaseImage()).asTaskManager().build()), str, "TaskManager-" + i));
            } catch (ImageBuildException e) {
                throw new RuntimeException("Failed to build TaskManager image", e);
            }
        }
        return arrayList;
    }

    private GenericContainer<?> configureZookeeperContainer() {
        return configureContainer(new GenericContainer<>(DockerImageName.parse("zookeeper").withTag("3.5.9")), this.flinkContainersSettings.getZookeeperHostname(), "Zookeeper");
    }

    private GenericContainer<?> configureContainer(GenericContainer<?> genericContainer, String str, String str2) {
        Iterator<GenericContainer<?>> it = this.testcontainersSettings.getDependencies().iterator();
        while (it.hasNext()) {
            Startable startable = (GenericContainer) it.next();
            startable.withNetwork(this.testcontainersSettings.getNetwork());
            genericContainer.dependsOn(new Startable[]{startable});
        }
        genericContainer.withNetwork(this.testcontainersSettings.getNetwork());
        genericContainer.withNetworkAliases(new String[]{str});
        Logger logger = this.testcontainersSettings.getLogger();
        if (logger != null) {
            genericContainer.withLogConsumer(new Slf4jLogConsumer(logger).withPrefix(str2));
        }
        genericContainer.withEnv(this.testcontainersSettings.getEnvVars());
        genericContainer.withWorkingDirectory(this.flinkContainersSettings.getFlinkHome());
        return genericContainer;
    }

    public FlinkContainers configure() {
        try {
            Path createTempDirectory = Files.createTempDirectory("flink-image-build", new FileAttribute[0]);
            GenericContainer<?> configureJobManagerContainer = configureJobManagerContainer(createTempDirectory);
            List<GenericContainer<?>> configureTaskManagerContainers = configureTaskManagerContainers(createTempDirectory);
            GenericContainer<?> genericContainer = null;
            if (this.flinkContainersSettings.isZookeeperHA().booleanValue()) {
                genericContainer = configureZookeeperContainer();
                createTempDirAndMountToContainer("flink-recovery", this.flinkContainersSettings.getHaStoragePath(), configureJobManagerContainer);
            }
            createTempDirAndMountToContainer("flink-checkpoint", this.flinkContainersSettings.getCheckpointPath(), configureJobManagerContainer);
            return new FlinkContainers(configureJobManagerContainer, configureTaskManagerContainers, genericContainer, this.flinkContainersSettings.getFlinkConfig());
        } catch (IOException e) {
            throw new RuntimeException("Failed to create temporary directory", e);
        }
    }

    void createTempDirAndMountToContainer(String str, String str2, GenericContainer<?> genericContainer) {
        try {
            Path createTempDirectory = Files.createTempDirectory(str, new FileAttribute[0]);
            File file = createTempDirectory.toFile();
            file.setReadable(true, false);
            file.setWritable(true, false);
            file.setExecutable(true, false);
            genericContainer.withFileSystemBind(createTempDirectory.toAbsolutePath().toString(), str2);
        } catch (IOException e) {
            throw new IllegalStateException("Failed to create temporary recovery directory", e);
        }
    }
}
