package cn.tenmg.flink.jobs.launcher;

import cn.tenmg.flink.jobs.FlinkJobsLauncher;
import cn.tenmg.flink.jobs.config.model.FlinkJobs;
import cn.tenmg.flink.jobs.config.model.Operate;
import cn.tenmg.flink.jobs.launcher.AbstractFlinkJobsLauncher;
import cn.tenmg.flink.jobs.launcher.context.FlinkJobsLauncherContext;
import cn.tenmg.flink.jobs.launcher.utils.Sets;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.StringReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/tenmg/flink/jobs/launcher/RestClusterClientFlinkJobsLauncher.class */
public class RestClusterClientFlinkJobsLauncher extends AbstractFlinkJobsLauncher {
    private static final int COUNT;
    private static final Logger log = LoggerFactory.getLogger(RestClusterClientFlinkJobsLauncher.class);
    private static final Queue<Configuration> configurations = new LinkedList();
    private static final Set<String> localOperates = Sets.as("Jdbc");

    @Override // cn.tenmg.flink.jobs.FlinkJobsLauncher
    public AbstractFlinkJobsLauncher.FlinkJobsInfo launch(FlinkJobs flinkJobs) throws Exception {
        boolean z;
        RestClusterClient restClusterClient = null;
        try {
            try {
                HashMap options = flinkJobs.getOptions();
                String property = FlinkJobsLauncherContext.getProperty("classpaths");
                String property2 = FlinkJobsLauncherContext.getProperty("parallelism.default", "1");
                SavepointRestoreSettings none = SavepointRestoreSettings.none();
                if (options != null && !options.isEmpty()) {
                    if (options.containsKey("classpaths")) {
                        property = (String) options.get("classpaths");
                    }
                    if (options.containsKey("parallelism")) {
                        property2 = (String) options.get("parallelism");
                    }
                    if (options.containsKey("fromSavepoint")) {
                        String str = (String) options.get("fromSavepoint");
                        none = options.containsKey("allowNonRestoredState") ? SavepointRestoreSettings.forPath(str, "true".equals(options.get("allowNonRestoredState"))) : SavepointRestoreSettings.forPath(str);
                    }
                }
                Configuration configuration = getConfiguration();
                PackagedProgram.Builder savepointRestoreSettings = PackagedProgram.newBuilder().setConfiguration(configuration).setEntryPointClassName(getEntryPointClassName(flinkJobs)).setJarFile(new File(getJar(flinkJobs))).setUserClassPaths(toURLs(property)).setSavepointRestoreSettings(none);
                String arguments = getArguments(flinkJobs);
                if (!isEmptyArguments(arguments).booleanValue()) {
                    savepointRestoreSettings.setArguments(new String[]{arguments});
                }
                if (flinkJobs.getServiceName() == null) {
                    z = false;
                    List operates = flinkJobs.getOperates();
                    if (operates != null) {
                        int i = 0;
                        int size = operates.size();
                        while (true) {
                            if (i >= size) {
                                break;
                            }
                            if (!localOperates.contains(((Operate) operates.get(i)).getType())) {
                                z = true;
                                break;
                            }
                            i++;
                        }
                    }
                } else {
                    z = true;
                }
                PackagedProgram build = savepointRestoreSettings.build();
                boolean booleanValue = Boolean.valueOf(FlinkJobsLauncherContext.getProperty("suppress.output", "false")).booleanValue();
                if (!z) {
                    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                    Thread.currentThread().setContextClassLoader(build.getUserCodeClassLoader());
                    PrintStream printStream = System.out;
                    PrintStream printStream2 = System.err;
                    if (booleanValue) {
                        System.setOut(new PrintStream(new ByteArrayOutputStream()));
                        System.setErr(new PrintStream(new ByteArrayOutputStream()));
                    }
                    try {
                        build.invokeInteractiveModeForExecution();
                        if (booleanValue) {
                            System.setOut(printStream);
                            System.setErr(printStream2);
                        }
                        Thread.currentThread().setContextClassLoader(contextClassLoader);
                        return null;
                    } catch (Throwable th) {
                        if (booleanValue) {
                            System.setOut(printStream);
                            System.setErr(printStream2);
                        }
                        Thread.currentThread().setContextClassLoader(contextClassLoader);
                        throw th;
                    }
                }
                JobGraph createJobGraph = PackagedProgramUtils.createJobGraph(build, configuration, Integer.parseInt(property2), booleanValue);
                Properties properties = toProperties(flinkJobs.getConfiguration());
                RestClusterClient<StandaloneClusterId> restClusterClient2 = getRestClusterClient(configuration, properties);
                for (int i2 = 0; i2 < COUNT; i2++) {
                    try {
                        AbstractFlinkJobsLauncher.FlinkJobsInfo submitJob = submitJob(restClusterClient2, createJobGraph);
                        if (restClusterClient2 != null) {
                            restClusterClient2.close();
                        }
                        return submitJob;
                    } catch (Exception e) {
                        if (restClusterClient2 != null) {
                            restClusterClient2.close();
                        }
                        if (i2 >= COUNT) {
                            throw e;
                        }
                        log.error("Try to submit job fail", e);
                        restClusterClient2 = getRestClusterClient(getConfiguration(), properties);
                    }
                }
                if (restClusterClient2 != null) {
                    restClusterClient2.close();
                }
                return null;
            } catch (Exception e2) {
                throw e2;
            }
        } finally {
            if (0 != 0) {
                restClusterClient.close();
            }
        }
    }

    @Override // cn.tenmg.flink.jobs.FlinkJobsLauncher
    public String stop(String str) throws Exception {
        RestClusterClient<StandaloneClusterId> restClusterClient = null;
        try {
            try {
                restClusterClient = getRestClusterClient(getConfiguration(), null);
                String str2 = (String) restClusterClient.stopWithSavepoint(JobID.fromHexString(str), false, FlinkJobsLauncherContext.getProperty("state.savepoints.dir")).get();
                if (restClusterClient != null) {
                    restClusterClient.close();
                }
                return str2;
            } catch (Exception e) {
                throw e;
            }
        } catch (Throwable th) {
            if (restClusterClient != null) {
                restClusterClient.close();
            }
            throw th;
        }
    }

    private static AbstractFlinkJobsLauncher.FlinkJobsInfo submitJob(RestClusterClient<StandaloneClusterId> restClusterClient, JobGraph jobGraph) throws Exception {
        JobID jobID = (JobID) restClusterClient.submitJob(jobGraph).get();
        AbstractFlinkJobsLauncher.FlinkJobsInfo flinkJobsInfo = new AbstractFlinkJobsLauncher.FlinkJobsInfo();
        flinkJobsInfo.setJobId(jobID.toString());
        JobStatus jobStatus = (JobStatus) restClusterClient.getJobStatus(jobID).get();
        if (JobStatus.INITIALIZING.equals(jobStatus)) {
            flinkJobsInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.ACCEPTED);
        } else if (JobStatus.RUNNING.equals(jobStatus)) {
            flinkJobsInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.RUNNING);
        } else if (JobStatus.FINISHED.equals(jobStatus)) {
            flinkJobsInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.FINISHED);
        } else if (JobStatus.FAILED.equals(jobStatus)) {
            flinkJobsInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.FAILED);
        } else if (JobStatus.CANCELED.equals(jobStatus)) {
            flinkJobsInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.KILLED);
        } else {
            flinkJobsInfo.setState(FlinkJobsLauncher.FlinkJobsInfo.State.SUBMITTED);
        }
        return flinkJobsInfo;
    }

    private static synchronized Configuration getConfiguration() {
        Configuration poll = configurations.poll();
        configurations.add(poll);
        return poll;
    }

    private RestClusterClient<StandaloneClusterId> getRestClusterClient(Configuration configuration, Properties properties) throws Exception {
        if (properties != null) {
            configuration = configuration.clone();
            configuration.addAll(ConfigurationUtils.createConfiguration(properties));
        }
        return newRestClusterClient(configuration);
    }

    private RestClusterClient<StandaloneClusterId> newRestClusterClient(Configuration configuration) throws Exception {
        return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance());
    }

    private static List<URL> toURLs(String str) throws MalformedURLException {
        if (str == null || "".equals(str.trim())) {
            return Collections.emptyList();
        }
        String[] split = str.contains(";") ? str.split(";") : str.split(",");
        ArrayList arrayList = new ArrayList();
        for (String str2 : split) {
            arrayList.add(new URL(str2.trim()));
        }
        return arrayList;
    }

    private static Properties toProperties(String str) throws IOException {
        Properties properties = null;
        if (str != null) {
            properties = new Properties();
            properties.load(new StringReader(str));
        }
        return properties;
    }

    static {
        Configuration createConfiguration = ConfigurationUtils.createConfiguration(FlinkJobsLauncherContext.getConfigProperties());
        String property = FlinkJobsLauncherContext.getProperty("jobmanager.rpc.servers");
        if (StringUtils.isBlank(property)) {
            configurations.add(createConfiguration);
        } else {
            for (String str : property.split(",")) {
                Configuration clone = createConfiguration.clone();
                String[] split = str.split(":", 2);
                clone.set(JobManagerOptions.ADDRESS, split[0].trim());
                if (split.length > 1) {
                    clone.set(JobManagerOptions.PORT, Integer.valueOf(Integer.parseInt(split[1].trim())));
                } else if (!clone.contains(JobManagerOptions.PORT)) {
                    clone.set(JobManagerOptions.PORT, 6123);
                }
                configurations.add(clone);
            }
        }
        COUNT = configurations.size();
    }
}
