package org.apache.flink.connector.testframe.environment;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.class */
public class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterTestEnvironment.class);
    private final MiniClusterWithClientResource miniCluster;
    private final Path checkpointPath;
    private int latestTMIndex;
    private boolean isStarted;

    public MiniClusterTestEnvironment() {
        this(defaultMiniClusterResourceConfiguration());
    }

    public MiniClusterTestEnvironment(MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
        this.latestTMIndex = 0;
        this.isStarted = false;
        this.miniCluster = new MiniClusterWithClientResource(miniClusterResourceConfiguration);
        try {
            this.checkpointPath = Files.createTempDirectory("minicluster-environment-checkpoint-", new FileAttribute[0]);
        } catch (IOException e) {
            throw new RuntimeException("Failed to create temporary checkpoint directory", e);
        }
    }

    private static MiniClusterResourceConfiguration defaultMiniClusterResourceConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 1000L);
        return new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(6).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build();
    }

    @Override // org.apache.flink.connector.testframe.environment.TestEnvironment
    public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings testEnvironmentSettings) {
        Configuration configuration = new Configuration();
        if (testEnvironmentSettings.getSavepointRestorePath() != null) {
            configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, testEnvironmentSettings.getSavepointRestorePath());
        }
        return new TestStreamEnvironment(this.miniCluster.getMiniCluster(), configuration, this.miniCluster.getNumberSlots(), (Collection) testEnvironmentSettings.getConnectorJarPaths().stream().map(url -> {
            return new org.apache.flink.core.fs.Path(url.getPath());
        }).collect(Collectors.toList()), Collections.emptyList());
    }

    @Override // org.apache.flink.connector.testframe.environment.TestEnvironment
    public TestEnvironment.Endpoint getRestEndpoint() {
        try {
            URI uri = (URI) this.miniCluster.getMiniCluster().getRestAddress().get();
            return new TestEnvironment.Endpoint(uri.getHost(), uri.getPort());
        } catch (Exception e) {
            throw new RuntimeException("Failed to get REST endpoint of MiniCluster", e);
        }
    }

    @Override // org.apache.flink.connector.testframe.environment.TestEnvironment
    public String getCheckpointUri() {
        return this.checkpointPath.toUri().toString();
    }

    @Override // org.apache.flink.connector.testframe.environment.ClusterControllable
    public void triggerJobManagerFailover(JobClient jobClient, Runnable runnable) throws ExecutionException, InterruptedException {
        Optional haLeadershipControl = this.miniCluster.getMiniCluster().getHaLeadershipControl();
        if (!haLeadershipControl.isPresent()) {
            throw new UnsupportedOperationException("This MiniCluster does not support JobManager HA");
        }
        HaLeadershipControl haLeadershipControl2 = (HaLeadershipControl) haLeadershipControl.get();
        haLeadershipControl2.revokeJobMasterLeadership(jobClient.getJobID()).get();
        runnable.run();
        haLeadershipControl2.grantJobMasterLeadership(jobClient.getJobID()).get();
    }

    @Override // org.apache.flink.connector.testframe.environment.ClusterControllable
    public void triggerTaskManagerFailover(JobClient jobClient, Runnable runnable) throws Exception {
        terminateTaskManager();
        CommonTestUtils.waitForNoTaskRunning(() -> {
            return (JobDetailsInfo) this.miniCluster.getRestClusterClient().getJobDetails(jobClient.getJobID()).get();
        });
        runnable.run();
        startTaskManager();
    }

    @Override // org.apache.flink.connector.testframe.environment.ClusterControllable
    public void isolateNetwork(JobClient jobClient, Runnable runnable) {
        throw new UnsupportedOperationException("Cannot isolate network in a MiniCluster");
    }

    @Override // org.apache.flink.connector.testframe.TestResource
    public void startUp() throws Exception {
        if (this.isStarted) {
            return;
        }
        this.miniCluster.before();
        LOG.debug("MiniCluster is running");
        this.isStarted = true;
    }

    @Override // org.apache.flink.connector.testframe.TestResource
    public void tearDown() throws Exception {
        if (this.isStarted) {
            this.isStarted = false;
            this.miniCluster.after();
            FileUtils.deleteDirectory(this.checkpointPath.toFile());
            LOG.debug("MiniCluster has been tear down");
        }
    }

    private void terminateTaskManager() throws Exception {
        this.miniCluster.getMiniCluster().terminateTaskManager(this.latestTMIndex).get();
        LOG.debug("TaskManager {} has been terminated.", Integer.valueOf(this.latestTMIndex));
    }

    private void startTaskManager() throws Exception {
        this.miniCluster.getMiniCluster().startTaskManager();
        this.latestTMIndex++;
        LOG.debug("New TaskManager {} has been launched.", Integer.valueOf(this.latestTMIndex));
    }

    public String toString() {
        return "MiniCluster";
    }
}
