package cn.sliew.flinkful.cli.descriptor;

import cn.sliew.flinkful.cli.base.PackageJarJob;
import cn.sliew.milky.common.util.RamUsageEstimator;
import java.io.File;
import java.util.Collections;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/sliew/flinkful/cli/descriptor/KubernetesApplicationCommand.class */
public class KubernetesApplicationCommand implements Command {
    private static final Logger log = LoggerFactory.getLogger(KubernetesApplicationCommand.class);

    @Override // cn.sliew.flinkful.cli.descriptor.Command
    public void submit(Configuration configuration, PackageJarJob packageJarJob) throws Exception {
        createClusterClient(createClusterDescriptor(createClientFactory(configuration), configuration, packageJarJob), YarnFlinkUtil.createClusterSpecification(), new ApplicationConfiguration(packageJarJob.getProgramArgs(), packageJarJob.getEntryPointClass()));
    }

    private ClusterClientFactory<String> createClientFactory(Configuration configuration) {
        configuration.setString(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
        return new DefaultClusterClientServiceLoader().getClusterClientFactory(configuration);
    }

    private KubernetesClusterDescriptor createClusterDescriptor(ClusterClientFactory<String> clusterClientFactory, Configuration configuration, PackageJarJob packageJarJob) {
        configuration.setLong(JobManagerOptions.TOTAL_PROCESS_MEMORY.key(), MemorySize.ofMebiBytes(RamUsageEstimator.ONE_KB).getBytes());
        configuration.setLong(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), MemorySize.ofMebiBytes(RamUsageEstimator.ONE_KB).getBytes());
        ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, Collections.singletonList(new File(packageJarJob.getJarFilePath())), (v0) -> {
            return v0.toString();
        });
        configuration.setString(KubernetesConfigOptions.CONTAINER_IMAGE, buildImage());
        return clusterClientFactory.createClusterDescriptor(configuration);
    }

    private String buildImage() {
        return "flink-example:1";
    }

    private ClusterClient<String> createClusterClient(KubernetesClusterDescriptor kubernetesClusterDescriptor, ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration) throws ClusterDeploymentException {
        ClusterClient<String> clusterClient = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration).getClusterClient();
        log.info("deploy application with clusterId: {}", clusterClient.getClusterId());
        return clusterClient;
    }
}
