package cn.sliew.flinkful.cli.descriptor;

import cn.sliew.flinkful.cli.base.FlinkUtil;
import cn.sliew.flinkful.cli.base.PackageJarJob;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;

/* loaded from: input_file:cn/sliew/flinkful/cli/descriptor/MiniClusterCommand.class */
public class MiniClusterCommand implements Command {
    @Override // cn.sliew.flinkful.cli.descriptor.Command
    public void submit(Configuration configuration, PackageJarJob packageJarJob) throws Exception {
        System.out.println((JobID) createClusterClient(createCluster(configuration), configuration).submitJob(PackagedProgramUtils.createJobGraph(FlinkUtil.buildProgram(configuration, packageJarJob), configuration, 1, false)).get());
    }

    private MiniCluster createCluster(Configuration configuration) throws Exception {
        configuration.setInteger(JobManagerOptions.PORT, ((Integer) JobManagerOptions.PORT.defaultValue()).intValue());
        configuration.setInteger("local.number-taskmanager", 1);
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, ((Integer) TaskManagerOptions.NUM_TASK_SLOTS.defaultValue()).intValue());
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(configuration).build());
        miniCluster.start();
        return miniCluster;
    }

    private MiniClusterClient createClusterClient(MiniCluster miniCluster, Configuration configuration) throws ExecutionException, InterruptedException {
        URI uri = (URI) miniCluster.getRestAddress().get();
        configuration.setString(JobManagerOptions.ADDRESS, uri.getHost());
        configuration.setInteger(JobManagerOptions.PORT, uri.getPort());
        configuration.setString(RestOptions.ADDRESS, uri.getHost());
        configuration.setInteger(RestOptions.PORT, uri.getPort());
        configuration.setString(DeploymentOptions.TARGET, "local");
        return new MiniClusterClient(configuration, miniCluster);
    }
}
