package edu.iu.dsc.tws.rsched.schedulers.nomad;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.common.config.ConfigLoader;
import edu.iu.dsc.tws.common.logging.LoggingHelper;
import edu.iu.dsc.tws.common.zk.ZKJobMasterFinder;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.schedulers.mesos.MesosContext;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import edu.iu.dsc.tws.rsched.worker.MPIWorkerManager;
import java.io.File;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/schedulers/nomad/NomadWorkerStarter.class */
public final class NomadWorkerStarter {
    private static final Logger LOG = Logger.getLogger(NomadWorkerStarter.class.getName());
    private static int startingPort = 30000;
    private NomadController controller;
    private JMWorkerAgent masterClient;
    private Config config;
    private IWorkerController workerController;
    private JobAPI.Job job;

    private NomadWorkerStarter(String[] strArr) {
        Options options = null;
        try {
            options = setupOptions();
            this.config = loadConfigurations(new DefaultParser().parse(options, strArr), 0);
            this.controller = new NomadController(true);
            this.controller.initialize(this.config);
        } catch (ParseException e) {
            new HelpFormatter().printHelp("SubmitterMain", options);
            throw new RuntimeException("Error parsing command line options: ", e);
        }
    }

    public static void main(String[] strArr) {
        new NomadWorkerStarter(strArr).run();
    }

    public void run() {
        try {
            startWorker();
        } finally {
            closeWorker();
        }
    }

    private Options setupOptions() {
        Options options = new Options();
        Option build = Option.builder("c").desc("The class name of the container to launch").longOpt("container_class").hasArgs().argName("container class").required().build();
        Option build2 = Option.builder("d").desc("The class name of the container to launch").longOpt("config_dir").hasArgs().argName("configuration directory").required().build();
        Option build3 = Option.builder("t").desc("The class name of the container to launch").longOpt("twister2_home").hasArgs().argName("twister2 home").required().build();
        Option build4 = Option.builder("n").desc("The clustr type").longOpt("cluster_type").hasArgs().argName("cluster type").required().build();
        Option build5 = Option.builder("j").desc("Job id").longOpt("job_id").hasArgs().argName("job id").required().build();
        options.addOption(build3);
        options.addOption(build);
        options.addOption(build2);
        options.addOption(build4);
        options.addOption(build5);
        return options;
    }

    private Config loadConfigurations(CommandLine commandLine, int i) {
        String optionValue = commandLine.getOptionValue("twister2_home");
        String optionValue2 = commandLine.getOptionValue("container_class");
        String optionValue3 = commandLine.getOptionValue("config_dir");
        String optionValue4 = commandLine.getOptionValue("cluster_type");
        String optionValue5 = commandLine.getOptionValue("job_id");
        LOG.log(Level.FINE, String.format("Initializing process with twister_home: %s container_class: %s config_dir: %s cluster_type: %s", optionValue, optionValue2, optionValue3, optionValue4));
        Config loadConfig = ConfigLoader.loadConfig(optionValue, optionValue3, optionValue4);
        this.job = JobUtils.readJobFile(JobUtils.getJobDescriptionFilePath(optionValue5, Config.newBuilder().putAll(loadConfig).put(SchedulerContext.TWISTER2_HOME.getKey(), optionValue).put(MesosContext.MESOS_CONTAINER_CLASS, optionValue2).put("twister2.container.id", Integer.valueOf(i)).put("twister2.cluster.type", optionValue4).build()));
        this.job.getNumberOfWorkers();
        return Config.newBuilder().putAll(JobUtils.overrideConfigs(this.job, loadConfig)).put(SchedulerContext.TWISTER2_HOME.getKey(), optionValue).put(MesosContext.MESOS_CONTAINER_CLASS, optionValue2).put("twister2.container.id", Integer.valueOf(i)).put("twister2.cluster.type", optionValue4).put("twister2.job.id", this.job.getJobId()).put("twister2.resource.job.name", this.job.getJobName()).build();
    }

    private void startWorker() {
        LOG.log(Level.INFO, "A worker process is starting...");
        this.workerController = createWorkerController();
        this.workerController.getWorkerInfo();
        try {
            LOG.log(Level.INFO, "Worker IP..:" + Inet4Address.getLocalHost().getHostAddress());
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        try {
            this.workerController.getAllWorkers();
            new MPIWorkerManager().execute(this.config, this.job, this.workerController, null, null, JobUtils.initializeIWorker(this.job));
        } catch (TimeoutException e2) {
            LOG.log(Level.SEVERE, e2.getMessage(), e2);
        }
    }

    private IWorkerController createWorkerController() {
        String jobMasterIP;
        int jobMasterPort;
        String str = System.getenv("NOMAD_ALLOC_INDEX");
        String str2 = System.getenv("NOMAD_ALLOC_ID");
        int intValue = Integer.valueOf(str).intValue();
        initLogger(this.config, intValue);
        LOG.log(Level.INFO, String.format("Worker id = %s and index = %d", str2, Integer.valueOf(intValue)));
        Map<String, Integer> ports = getPorts(this.config);
        Map<String, String> iPAddress = getIPAddress(ports);
        int numberOfWorkers = this.job.getNumberOfWorkers();
        LOG.info("Worker Count..: " + numberOfWorkers);
        JobAPI.ComputeResource computeResource = JobUtils.getComputeResource(this.job, 0);
        int intValue2 = ports.get("worker").intValue();
        String str3 = iPAddress.get("worker");
        JobMasterAPI.WorkerInfo createWorkerInfo = WorkerInfoUtils.createWorkerInfo(intValue, str3, intValue2, NomadContext.getNodeInfo(this.config, str3), computeResource, ports);
        if (JobMasterContext.jobMasterRunsInClient(this.config)) {
            jobMasterIP = JobMasterContext.jobMasterIP(this.config);
            jobMasterPort = JobMasterContext.jobMasterPort(this.config);
        } else {
            ZKJobMasterFinder zKJobMasterFinder = new ZKJobMasterFinder(this.config, this.job.getJobId());
            zKJobMasterFinder.initialize();
            String jobMasterIPandPort = zKJobMasterFinder.getJobMasterIPandPort();
            if (jobMasterIPandPort == null) {
                LOG.info("Job Master has not joined yet. Will wait and try to get the address ...");
                jobMasterIPandPort = zKJobMasterFinder.waitAndGetJobMasterIPandPort(20000L);
                LOG.info("Job Master address: " + jobMasterIPandPort);
            } else {
                LOG.info("Job Master address: " + jobMasterIPandPort);
            }
            zKJobMasterFinder.close();
            jobMasterPort = Integer.parseInt(jobMasterIPandPort.substring(jobMasterIPandPort.lastIndexOf(":") + 1));
            jobMasterIP = jobMasterIPandPort.substring(0, jobMasterIPandPort.lastIndexOf(":"));
        }
        this.config = JobUtils.overrideConfigs(this.job, this.config);
        this.config = JobUtils.updateConfigs(this.job, this.config);
        LOG.info("Worker Count..: " + this.job.getNumberOfWorkers());
        this.masterClient = createMasterAgent(this.config, jobMasterIP, jobMasterPort, createWorkerInfo, numberOfWorkers);
        return this.masterClient.getJMWorkerController();
    }

    private JMWorkerAgent createMasterAgent(Config config, String str, int i, JobMasterAPI.WorkerInfo workerInfo, int i2) {
        JMWorkerAgent createJMWorkerAgent = JMWorkerAgent.createJMWorkerAgent(config, workerInfo, str, i, i2, 0);
        LOG.log(Level.INFO, String.format("Connecting to job master..: %s:%d", str, Integer.valueOf(i)));
        createJMWorkerAgent.startThreaded();
        return createJMWorkerAgent;
    }

    private Map<String, Integer> getPorts(Config config) {
        String[] split = NomadContext.networkPortNames(config).split(",");
        HashMap hashMap = new HashMap();
        for (String str : split) {
            hashMap.put(str, Integer.valueOf(Integer.valueOf(System.getenv("NOMAD_PORT_" + str)).intValue()));
        }
        return hashMap;
    }

    private void initLogger(Config config, int i) {
        LoggingHelper.setLoggingFormat("[%1$tF %1$tT] [%4$s] [%7$s] %3$s: %5$s %6$s %n");
        NomadContext.workingDirectory(config);
        String absolutePath = new NomadPersistentVolume(this.controller.createPersistentJobDirName(NomadContext.jobId(config)), i).getJobDir().getAbsolutePath();
        if (absolutePath == null) {
            return;
        }
        String str = absolutePath + "/logs";
        LOG.log(Level.INFO, "LOG DIR is ......: " + str);
        File file = new File(str);
        if (!file.exists() && !file.mkdirs()) {
            throw new RuntimeException("Failed to create log directory: " + str);
        }
        LoggingHelper.setupLogging(config, str, "worker-" + i);
    }

    private String getTaskDirectory() {
        return System.getenv("NOMAD_TASK_DIR");
    }

    private Map<String, String> getIPAddress(Map<String, Integer> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), System.getenv("NOMAD_IP_" + entry.getKey()));
        }
        return hashMap;
    }

    public void closeWorker() {
        this.masterClient.sendWorkerCompletedMessage(JobMasterAPI.WorkerState.COMPLETED);
        this.masterClient.close();
    }
}
