package cn.wjee.gradle.flink.api;

import cn.wjee.gradle.flink.config.FlinkClusterClientBuilder;
import cn.wjee.gradle.flink.config.FlinkExtConfig;
import java.io.File;
import java.io.FileNotFoundException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/wjee/gradle/flink/api/FlinkSubmitterApi.class */
public class FlinkSubmitterApi {
    private static final Logger log = LoggerFactory.getLogger(FlinkSubmitterApi.class);

    public static RestClusterClient<StandaloneClusterId> getFlinkClient(FlinkExtConfig flinkExtConfig) {
        return FlinkClusterClientBuilder.builder().defaultSsl().address(flinkExtConfig.getHost(), flinkExtConfig.getRestPort(), flinkExtConfig.getRpcPort()).build();
    }

    public static List<JobID> getJobIdV2(RestClusterClient<StandaloneClusterId> restClusterClient, String str) {
        try {
            return (List) ((Collection) restClusterClient.listJobs().get(15L, TimeUnit.SECONDS)).stream().filter(jobStatusMessage -> {
                return jobStatusMessage.getJobName().equals(str);
            }).filter(jobStatusMessage2 -> {
                return jobStatusMessage2.getJobState() == JobStatus.RUNNING;
            }).map((v0) -> {
                return v0.getJobId();
            }).collect(Collectors.toList());
        } catch (Exception e) {
            return new ArrayList();
        }
    }

    public static void cancelJob(RestClusterClient<StandaloneClusterId> restClusterClient, JobID jobID, String str) {
        try {
            restClusterClient.cancelWithSavepoint(jobID, str);
        } catch (Exception e) {
        }
    }

    public static void submitJob(FlinkExtConfig flinkExtConfig) {
        RestClusterClient restClusterClient = null;
        PackagedProgram packagedProgram = null;
        try {
            try {
                File file = Paths.get(flinkExtConfig.getJobJar(), new String[0]).toAbsolutePath().toFile();
                if (!file.exists()) {
                    throw new FileNotFoundException("JarFile Not Exists");
                }
                RestClusterClient<StandaloneClusterId> flinkClient = getFlinkClient(flinkExtConfig);
                PackagedProgram build = PackagedProgram.newBuilder().setJarFile(file).setArguments(flinkExtConfig.getJobArgs()).build();
                JobGraph createJobGraph = PackagedProgramUtils.createJobGraph(build, flinkClient.getFlinkConfiguration(), flinkExtConfig.getParallelism().intValue(), false);
                JobID jobIdBeforeSubmit = getJobIdBeforeSubmit(flinkClient, flinkExtConfig);
                if (jobIdBeforeSubmit != null) {
                    createJobGraph.setJobID(jobIdBeforeSubmit);
                }
                flinkClient.submitJob(createJobGraph).get();
                if (flinkClient != null) {
                    flinkClient.close();
                }
                if (build != null) {
                    build.close();
                }
            } catch (Exception e) {
                log.error("submit Job fail", e);
                if (0 != 0) {
                    restClusterClient.close();
                }
                if (0 != 0) {
                    packagedProgram.close();
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                restClusterClient.close();
            }
            if (0 != 0) {
                packagedProgram.close();
            }
            throw th;
        }
    }

    private static JobID getJobIdBeforeSubmit(RestClusterClient<StandaloneClusterId> restClusterClient, FlinkExtConfig flinkExtConfig) {
        try {
            List<JobID> jobIdV2 = getJobIdV2(restClusterClient, flinkExtConfig.getJobName());
            if (jobIdV2 == null || jobIdV2.isEmpty()) {
                return null;
            }
            jobIdV2.forEach(jobID -> {
                cancelJob(restClusterClient, jobID, flinkExtConfig.getSavePoint());
            });
            return jobIdV2.stream().findFirst().orElse(null);
        } catch (Exception e) {
            return null;
        }
    }
}
