package org.apache.flink.connector.testframe.container;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:org/apache/flink/connector/testframe/container/FlinkContainers.class */
public class FlinkContainers implements BeforeAllCallback, AfterAllCallback {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainers.class);
    public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
    private final GenericContainer<?> jobManager;
    private final List<GenericContainer<?>> taskManagers;
    private final GenericContainer<?> haService;
    private final Configuration conf;

    @Nullable
    private RestClusterClient<StandaloneClusterId> restClusterClient;
    private boolean isStarted = false;

    /* loaded from: input_file:org/apache/flink/connector/testframe/container/FlinkContainers$Builder.class */
    public static final class Builder {
        private FlinkContainersSettings flinkContainersSettings;
        private TestcontainersSettings testcontainersSettings;

        private Builder() {
            this.flinkContainersSettings = FlinkContainersSettings.defaultConfig();
            this.testcontainersSettings = TestcontainersSettings.defaultSettings();
        }

        public Builder withFlinkContainersSettings(FlinkContainersSettings flinkContainersSettings) {
            this.flinkContainersSettings = flinkContainersSettings;
            return this;
        }

        public Builder withTestcontainersSettings(TestcontainersSettings testcontainersSettings) {
            this.testcontainersSettings = testcontainersSettings;
            return this;
        }

        public FlinkContainers build() {
            return new FlinkTestcontainersConfigurator(this.flinkContainersSettings, this.testcontainersSettings).configure();
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkContainers(GenericContainer<?> genericContainer, List<GenericContainer<?>> list, @Nullable GenericContainer<?> genericContainer2, Configuration configuration) {
        this.jobManager = genericContainer;
        this.taskManagers = list;
        this.haService = genericContainer2;
        this.conf = configuration;
    }

    public void start() throws Exception {
        if (this.haService != null) {
            LOG.debug("Starting HA service container");
            this.haService.start();
        }
        LOG.debug("Starting JobManager container");
        this.jobManager.start();
        waitUntilJobManagerRESTReachable(this.jobManager);
        LOG.debug("Starting TaskManager containers");
        this.taskManagers.parallelStream().forEach((v0) -> {
            v0.start();
        });
        LOG.debug("Creating REST cluster client");
        this.restClusterClient = createClusterClient();
        waitUntilAllTaskManagerConnected();
        this.isStarted = true;
    }

    public void stop() {
        this.isStarted = false;
        if (this.restClusterClient != null) {
            this.restClusterClient.close();
        }
        this.taskManagers.forEach((v0) -> {
            v0.stop();
        });
        deleteJobManagerTemporaryFiles();
        this.jobManager.stop();
        if (this.haService != null) {
            this.haService.stop();
        }
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public GenericContainer<?> getJobManager() {
        return this.jobManager;
    }

    public List<GenericContainer<?>> getTaskManagers() {
        return this.taskManagers;
    }

    public String getJobManagerHost() {
        return this.jobManager.getHost();
    }

    public int getJobManagerPort() {
        return this.jobManager.getMappedPort(((Integer) this.conf.get(RestOptions.PORT)).intValue()).intValue();
    }

    @Nullable
    public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
        return this.restClusterClient;
    }

    public void restartJobManager(RunnableWithException runnableWithException) throws Exception {
        if (this.haService == null) {
            LOG.warn("Restarting JobManager without HA service. This might drop all your running jobs");
        }
        this.jobManager.stop();
        runnableWithException.run();
        this.jobManager.start();
        waitUntilJobManagerRESTReachable(this.jobManager);
        this.restClusterClient = createClusterClient();
        waitUntilAllTaskManagerConnected();
    }

    public void restartTaskManager(RunnableWithException runnableWithException) throws Exception {
        this.taskManagers.forEach((v0) -> {
            v0.stop();
        });
        runnableWithException.run();
        this.taskManagers.forEach((v0) -> {
            v0.start();
        });
    }

    public void submitSQLJob(SQLJobSubmission sQLJobSubmission) throws IOException, InterruptedException {
        Preconditions.checkState(isStarted(), "SQL job submission is only applicable for a running cluster");
        ArrayList arrayList = new ArrayList();
        Path resolve = Files.createTempDirectory("sql-script", new FileAttribute[0]).resolve("script");
        Files.write(resolve, sQLJobSubmission.getSqlLines(), new OpenOption[0]);
        this.jobManager.copyFileToContainer(MountableFile.forHostPath(resolve), "/tmp/script.sql");
        arrayList.add("cat /tmp/script.sql | ");
        arrayList.add("bin/sql-client.sh");
        for (String str : sQLJobSubmission.getJars()) {
            arrayList.add("--jar");
            Path path = Paths.get(str, new String[0]);
            String str2 = "/tmp/" + path.getFileName();
            this.jobManager.copyFileToContainer(MountableFile.forHostPath(path), str2);
            arrayList.add(str2);
        }
        Container.ExecResult execInContainer = this.jobManager.execInContainer(new String[]{"bash", "-c", String.join(" ", arrayList)});
        LOG.info(execInContainer.getStdout());
        LOG.error(execInContainer.getStderr());
        if (execInContainer.getExitCode() != 0) {
            throw new AssertionError("Failed when submitting the SQL job.");
        }
    }

    public JobID submitJob(JobSubmission jobSubmission) throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        arrayList.add("bin/flink");
        arrayList.add("run");
        if (jobSubmission.isDetached()) {
            arrayList.add("-d");
        }
        if (jobSubmission.getParallelism() > 0) {
            arrayList.add("-p");
            arrayList.add(String.valueOf(jobSubmission.getParallelism()));
        }
        jobSubmission.getMainClass().ifPresent(str -> {
            arrayList.add("--class");
            arrayList.add(str);
        });
        Path jar = jobSubmission.getJar();
        String str2 = "/tmp/" + jar.getFileName();
        arrayList.add(str2);
        this.jobManager.copyFileToContainer(MountableFile.forHostPath(jar.toAbsolutePath()), str2);
        arrayList.addAll(jobSubmission.getArguments());
        LOG.info("Running {}.", arrayList.stream().collect(Collectors.joining(" ")));
        Container.ExecResult execInContainer = this.jobManager.execInContainer(new String[]{"bash", "-c", String.join(" ", arrayList)});
        Pattern compile = jobSubmission.isDetached() ? Pattern.compile("Job has been submitted with JobID (.*)") : Pattern.compile("Job with JobID (.*) has finished.");
        String stdout = execInContainer.getStdout();
        LOG.info(stdout);
        LOG.error(execInContainer.getStderr());
        Matcher matcher = compile.matcher(stdout);
        Preconditions.checkState(matcher.find(), "Cannot extract JobID from stdout.");
        return JobID.fromHexString(matcher.group(1));
    }

    public void beforeAll(ExtensionContext extensionContext) throws Exception {
        start();
    }

    public void afterAll(ExtensionContext extensionContext) {
        stop();
    }

    private RestClusterClient<StandaloneClusterId> createClusterClient() throws Exception {
        Preconditions.checkState(this.jobManager.isRunning(), "JobManager should be running for creating a REST client");
        if (this.restClusterClient != null) {
            this.restClusterClient.close();
        }
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.ADDRESS, getJobManagerHost());
        configuration.set(RestOptions.PORT, this.jobManager.getMappedPort(((Integer) this.conf.get(RestOptions.PORT)).intValue()));
        return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
    }

    private void waitUntilJobManagerRESTReachable(GenericContainer<?> genericContainer) {
        LOG.debug("Waiting for JobManager's REST interface getting ready");
        new HttpWaitStrategy().forPort(((Integer) this.conf.get(RestOptions.PORT)).intValue()).forPath("/overview").forStatusCode(200).withReadTimeout(DEFAULT_TIMEOUT).waitUntilReady(genericContainer);
    }

    private void waitUntilAllTaskManagerConnected() throws InterruptedException, TimeoutException {
        LOG.debug("Waiting for all TaskManagers connecting to JobManager");
        Preconditions.checkNotNull(this.restClusterClient, "REST cluster client should not be null when checking TaskManager status");
        CommonTestUtils.waitUtil(() -> {
            try {
                return Boolean.valueOf(((ClusterOverviewWithVersion) this.restClusterClient.sendRequest(ClusterOverviewHeaders.getInstance(), EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()).get()).getNumTaskManagersConnected() == this.taskManagers.size());
            } catch (Exception e) {
                throw new RuntimeException("Failed to get cluster overview", e);
            }
        }, DEFAULT_TIMEOUT, "TaskManagers are not ready within 30 seconds");
    }

    private void deleteJobManagerTemporaryFiles() {
        Collection collection = (Collection) Lists.newArrayList(new String[]{(String) this.conf.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY), (String) this.conf.get(HighAvailabilityOptions.HA_STORAGE_PATH)}).stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (collection.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder("rm -rf");
        collection.forEach(str -> {
            sb.append(formatFilePathForDeletion(str));
        });
        String[] strArr = {"bash", "-c", sb.toString()};
        try {
            Container.ExecResult execInContainer = this.jobManager.execInContainer(strArr);
            if (execInContainer.getExitCode() != 0) {
                throw new IllegalStateException(String.format("Command \"%s\" returned non-zero exit code %d. \nSTDOUT: %s\nSTDERR: %s", String.join(" ", strArr), Integer.valueOf(execInContainer.getExitCode()), execInContainer.getStdout(), execInContainer.getStderr()));
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to delete temporary files generated by the flink cluster.", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Failed to delete temporary files generated by the flink cluster.", e2);
        }
    }

    private String formatFilePathForDeletion(String str) {
        return " " + Paths.get(str, new String[0]).toString().split("file:")[1] + "/*";
    }
}
