package cn.sliew.flinkful.cli.descriptor;

import cn.sliew.flinkful.cli.base.PackageJarJob;
import java.io.File;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/sliew/flinkful/cli/descriptor/YarnApplicationCommand.class */
public class YarnApplicationCommand implements Command {
    private static final Logger log = LoggerFactory.getLogger(YarnApplicationCommand.class);

    @Override // cn.sliew.flinkful.cli.descriptor.Command
    public void submit(Configuration configuration, PackageJarJob packageJarJob) throws Exception {
        createClusterClient(createClusterDescriptor(createClientFactory(configuration), configuration, packageJarJob), YarnFlinkUtil.createClusterSpecification(), new ApplicationConfiguration(packageJarJob.getProgramArgs(), packageJarJob.getEntryPointClass()));
    }

    private ClusterClientFactory<ApplicationId> createClientFactory(Configuration configuration) {
        configuration.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
        return new DefaultClusterClientServiceLoader().getClusterClientFactory(configuration);
    }

    private YarnClusterDescriptor createClusterDescriptor(ClusterClientFactory<ApplicationId> clusterClientFactory, Configuration configuration, PackageJarJob packageJarJob) throws MalformedURLException {
        configuration.setLong(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), MemorySize.ofMebiBytes(4096L).getBytes());
        configuration.setLong(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), MemorySize.ofMebiBytes(4096L).getBytes());
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Arrays.asList("hdfs://hadoop:9000/flink/1.13.6"));
        configuration.set(YarnConfigOptions.FLINK_DIST_JAR, "hdfs://hadoop:9000/flink/1.13.6/flink-dist_2.11-1.13.6.jar");
        ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, Collections.singletonList(new File(packageJarJob.getJarFilePath())), (v0) -> {
            return v0.toString();
        });
        return clusterClientFactory.createClusterDescriptor(configuration);
    }

    private ClusterClient<ApplicationId> createClusterClient(YarnClusterDescriptor yarnClusterDescriptor, ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {
        ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration).getClusterClient();
        log.info("deploy application with appId: {}", clusterClient.getClusterId());
        return clusterClient;
    }
}
